use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt;
use super::error::DomainError;
use super::location::LocationId;
use super::retry::{RetryPolicy, TransferErrorKind};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum TransferKind {
#[default]
Sync,
Delete,
}
impl TransferKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::Sync => "sync",
Self::Delete => "delete",
}
}
}
impl fmt::Display for TransferKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for TransferKind {
type Err = DomainError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"sync" => Ok(Self::Sync),
"delete" => Ok(Self::Delete),
other => Err(DomainError::Validation {
field: "transfer_kind".into(),
reason: format!("unknown transfer kind: {other}"),
}),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TransferState {
Blocked,
Queued,
InFlight,
Completed,
Failed,
Cancelled,
}
impl TransferState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Blocked => "blocked",
Self::Queued => "queued",
Self::InFlight => "in_flight",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
pub fn is_actionable(&self) -> bool {
matches!(self, Self::Queued)
}
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
}
}
impl fmt::Display for TransferState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for TransferState {
type Err = DomainError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"blocked" => Ok(Self::Blocked),
"queued" => Ok(Self::Queued),
"in_flight" => Ok(Self::InFlight),
"completed" => Ok(Self::Completed),
"failed" => Ok(Self::Failed),
"cancelled" => Ok(Self::Cancelled),
other => Err(DomainError::InvalidTransferState(other.to_string())),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Transfer {
id: String,
file_id: String,
src: LocationId,
dest: LocationId,
#[serde(default)]
kind: TransferKind,
state: TransferState,
#[serde(default, skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
error_kind: Option<TransferErrorKind>,
attempt: u32,
created_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
started_at: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
finished_at: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
depends_on: Option<String>,
}
impl Transfer {
pub fn new(file_id: String, src: LocationId, dest: LocationId) -> Result<Self, DomainError> {
Self::with_kind(file_id, src, dest, TransferKind::Sync)
}
pub fn new_delete(
file_id: String,
src: LocationId,
dest: LocationId,
) -> Result<Self, DomainError> {
Self::with_kind(file_id, src, dest, TransferKind::Delete)
}
pub fn with_kind(
file_id: String,
src: LocationId,
dest: LocationId,
kind: TransferKind,
) -> Result<Self, DomainError> {
if file_id.is_empty() {
return Err(DomainError::Validation {
field: "file_id".into(),
reason: "must not be empty".into(),
});
}
if src == dest {
return Err(DomainError::Validation {
field: "src/dest".into(),
reason: format!("self-transfer is not allowed: {src}"),
});
}
Ok(Self {
id: uuid::Uuid::new_v4().to_string(),
file_id,
src,
dest,
kind,
state: TransferState::Queued,
error: None,
error_kind: None,
attempt: 1,
created_at: Utc::now(),
started_at: None,
finished_at: None,
depends_on: None,
})
}
pub fn with_dependency(
file_id: String,
src: LocationId,
dest: LocationId,
kind: TransferKind,
depends_on_id: String,
) -> Result<Self, DomainError> {
if file_id.is_empty() {
return Err(DomainError::Validation {
field: "file_id".into(),
reason: "must not be empty".into(),
});
}
if src == dest {
return Err(DomainError::Validation {
field: "src/dest".into(),
reason: format!("self-transfer is not allowed: {src}"),
});
}
Ok(Self {
id: uuid::Uuid::new_v4().to_string(),
file_id,
src,
dest,
kind,
state: TransferState::Blocked,
error: None,
error_kind: None,
attempt: 1,
created_at: Utc::now(),
started_at: None,
finished_at: None,
depends_on: Some(depends_on_id),
})
}
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub(crate) fn reconstitute(
id: String,
file_id: String,
src: LocationId,
dest: LocationId,
kind: TransferKind,
state: TransferState,
error: Option<String>,
error_kind: Option<TransferErrorKind>,
attempt: u32,
created_at: DateTime<Utc>,
started_at: Option<DateTime<Utc>>,
finished_at: Option<DateTime<Utc>>,
) -> Self {
Self {
id,
file_id,
src,
dest,
kind,
state,
error,
error_kind,
attempt,
created_at,
started_at,
finished_at,
depends_on: None,
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn reconstitute_with_dependency(
id: String,
file_id: String,
src: LocationId,
dest: LocationId,
kind: TransferKind,
state: TransferState,
error: Option<String>,
error_kind: Option<TransferErrorKind>,
attempt: u32,
created_at: DateTime<Utc>,
started_at: Option<DateTime<Utc>>,
finished_at: Option<DateTime<Utc>>,
depends_on: Option<String>,
) -> Self {
Self {
id,
file_id,
src,
dest,
kind,
state,
error,
error_kind,
attempt,
created_at,
started_at,
finished_at,
depends_on,
}
}
pub fn unblock(&mut self) -> Result<(), DomainError> {
if self.state != TransferState::Blocked {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "queued (unblock)".to_string(),
});
}
self.state = TransferState::Queued;
Ok(())
}
pub fn start(&mut self) -> Result<(), DomainError> {
if self.state != TransferState::Queued {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "in_flight".to_string(),
});
}
self.state = TransferState::InFlight;
self.started_at = Some(Utc::now());
Ok(())
}
pub fn complete(&mut self) -> Result<(), DomainError> {
if self.state != TransferState::InFlight {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "completed".to_string(),
});
}
self.state = TransferState::Completed;
self.finished_at = Some(Utc::now());
Ok(())
}
pub fn fail(&mut self, error: String, kind: TransferErrorKind) -> Result<(), DomainError> {
if self.state != TransferState::InFlight {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "failed".to_string(),
});
}
self.state = TransferState::Failed;
self.error = Some(error);
self.error_kind = Some(kind);
self.finished_at = Some(Utc::now());
Ok(())
}
pub fn resolve(&mut self) -> Result<(), DomainError> {
if self.state != TransferState::Failed {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "completed (resolve)".to_string(),
});
}
self.state = TransferState::Completed;
self.finished_at = Some(Utc::now());
Ok(())
}
pub fn cancel(&mut self) -> Result<(), DomainError> {
if self.state != TransferState::InFlight {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "cancelled".to_string(),
});
}
self.state = TransferState::Cancelled;
self.finished_at = Some(Utc::now());
Ok(())
}
pub fn retry(&self) -> Result<Self, DomainError> {
if self.state != TransferState::Failed {
return Err(DomainError::InvalidStateTransition {
from: self.state.as_str().to_string(),
to: "queued (retry)".to_string(),
});
}
Ok(Self {
id: uuid::Uuid::new_v4().to_string(),
file_id: self.file_id.clone(),
src: self.src.clone(),
dest: self.dest.clone(),
kind: self.kind,
state: TransferState::Queued,
error: None,
error_kind: None,
attempt: self.attempt.saturating_add(1),
created_at: Utc::now(),
started_at: None,
finished_at: None,
depends_on: None,
})
}
pub fn is_retryable(&self, policy: &RetryPolicy) -> bool {
self.state == TransferState::Failed
&& self.error_kind == Some(TransferErrorKind::Transient)
&& self.attempt < policy.max_attempts()
}
pub fn is_exhausted(&self, policy: &RetryPolicy) -> bool {
self.state == TransferState::Failed
&& (self.error_kind == Some(TransferErrorKind::Permanent)
|| self.attempt >= policy.max_attempts())
}
pub fn id(&self) -> &str {
&self.id
}
pub fn file_id(&self) -> &str {
&self.file_id
}
pub fn src(&self) -> &LocationId {
&self.src
}
pub fn dest(&self) -> &LocationId {
&self.dest
}
pub fn kind(&self) -> TransferKind {
self.kind
}
pub fn is_delete(&self) -> bool {
self.kind == TransferKind::Delete
}
pub fn state(&self) -> TransferState {
self.state
}
pub fn error(&self) -> Option<&str> {
self.error.as_deref()
}
pub fn error_kind(&self) -> Option<TransferErrorKind> {
self.error_kind
}
pub fn attempt(&self) -> u32 {
self.attempt
}
pub fn created_at(&self) -> DateTime<Utc> {
self.created_at
}
pub fn started_at(&self) -> Option<DateTime<Utc>> {
self.started_at
}
pub fn finished_at(&self) -> Option<DateTime<Utc>> {
self.finished_at
}
pub fn depends_on(&self) -> Option<&str> {
self.depends_on.as_deref()
}
pub fn to_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn loc(s: &str) -> LocationId {
LocationId::new(s).unwrap()
}
fn sample_transfer() -> Transfer {
Transfer::new("file-1".into(), loc("local"), loc("cloud")).expect("valid test data")
}
fn failed_transient(attempt: u32) -> Transfer {
Transfer::reconstitute(
uuid::Uuid::new_v4().to_string(),
"file-1".into(),
loc("local"),
loc("cloud"),
TransferKind::Sync,
TransferState::Failed,
Some("timeout".into()),
Some(TransferErrorKind::Transient),
attempt,
Utc::now(),
Some(Utc::now()),
Some(Utc::now()),
)
}
fn failed_permanent() -> Transfer {
Transfer::reconstitute(
uuid::Uuid::new_v4().to_string(),
"file-1".into(),
loc("local"),
loc("cloud"),
TransferKind::Sync,
TransferState::Failed,
Some("file not found".into()),
Some(TransferErrorKind::Permanent),
1,
Utc::now(),
Some(Utc::now()),
Some(Utc::now()),
)
}
#[test]
fn new_creates_queued_transfer() {
let t = sample_transfer();
assert_eq!(t.state(), TransferState::Queued);
assert_eq!(t.attempt(), 1);
assert_eq!(t.file_id(), "file-1");
assert_eq!(t.src(), &loc("local"));
assert_eq!(t.dest(), &loc("cloud"));
assert!(t.error().is_none());
assert!(t.error_kind().is_none());
assert!(t.started_at().is_none());
assert!(t.finished_at().is_none());
}
#[test]
fn start_transitions_to_in_flight() {
let mut t = sample_transfer();
t.start().unwrap();
assert_eq!(t.state(), TransferState::InFlight);
assert!(t.started_at().is_some());
}
#[test]
fn complete_transitions_to_completed() {
let mut t = sample_transfer();
t.start().unwrap();
t.complete().unwrap();
assert_eq!(t.state(), TransferState::Completed);
assert!(t.finished_at().is_some());
}
#[test]
fn fail_transient_transitions_to_failed() {
let mut t = sample_transfer();
t.start().unwrap();
t.fail("B2 timeout".into(), TransferErrorKind::Transient)
.unwrap();
assert_eq!(t.state(), TransferState::Failed);
assert_eq!(t.error(), Some("B2 timeout"));
assert_eq!(t.error_kind(), Some(TransferErrorKind::Transient));
assert!(t.finished_at().is_some());
}
#[test]
fn fail_permanent_transitions_to_failed() {
let mut t = sample_transfer();
t.start().unwrap();
t.fail("file not found".into(), TransferErrorKind::Permanent)
.unwrap();
assert_eq!(t.state(), TransferState::Failed);
assert_eq!(t.error_kind(), Some(TransferErrorKind::Permanent));
}
#[test]
fn transient_within_limit_is_retryable() {
let policy = RetryPolicy::new(3);
let t = failed_transient(1); assert!(t.is_retryable(&policy));
assert!(!t.is_exhausted(&policy));
}
#[test]
fn transient_at_limit_is_exhausted() {
let policy = RetryPolicy::new(3);
let t = failed_transient(3); assert!(!t.is_retryable(&policy));
assert!(t.is_exhausted(&policy));
}
#[test]
fn permanent_is_never_retryable() {
let policy = RetryPolicy::new(10);
let t = failed_permanent(); assert!(!t.is_retryable(&policy));
assert!(t.is_exhausted(&policy));
}
#[test]
fn queued_is_not_retryable() {
let policy = RetryPolicy::default();
let t = sample_transfer();
assert!(!t.is_retryable(&policy));
assert!(!t.is_exhausted(&policy));
}
#[test]
fn retry_creates_new_queued_transfer() {
let mut t = sample_transfer();
t.start().unwrap();
t.fail("error".into(), TransferErrorKind::Transient)
.unwrap();
let t2 = t.retry().unwrap();
assert_eq!(t2.state(), TransferState::Queued);
assert_eq!(t2.attempt(), 2);
assert_eq!(t2.file_id(), "file-1");
assert_eq!(t2.src(), &loc("local"));
assert_eq!(t2.dest(), &loc("cloud"));
assert!(t2.error().is_none());
assert!(t2.error_kind().is_none());
assert_ne!(t2.id(), t.id(), "retry must generate new id");
}
#[test]
fn retry_preserves_original() {
let mut t = sample_transfer();
t.start().unwrap();
t.fail("error".into(), TransferErrorKind::Transient)
.unwrap();
let original_id = t.id().to_string();
let _ = t.retry().unwrap();
assert_eq!(t.id(), original_id);
assert_eq!(t.state(), TransferState::Failed);
assert_eq!(t.error(), Some("error"));
assert_eq!(t.error_kind(), Some(TransferErrorKind::Transient));
}
#[test]
fn start_from_non_queued_fails() {
let mut t = sample_transfer();
t.start().unwrap();
assert!(t.start().is_err(), "InFlight → InFlight is invalid");
}
#[test]
fn complete_from_queued_fails() {
let mut t = sample_transfer();
assert!(t.complete().is_err(), "Queued → Completed is invalid");
}
#[test]
fn fail_from_queued_fails() {
let mut t = sample_transfer();
assert!(
t.fail("err".into(), TransferErrorKind::Transient).is_err(),
"Queued → Failed is invalid"
);
}
#[test]
fn retry_from_non_failed_fails() {
let t = sample_transfer();
assert!(t.retry().is_err(), "Queued → retry is invalid");
}
#[test]
fn retry_from_completed_fails() {
let mut t = sample_transfer();
t.start().unwrap();
t.complete().unwrap();
assert!(t.retry().is_err(), "Completed → retry is invalid");
}
#[test]
fn state_roundtrip() {
for state in [
TransferState::Queued,
TransferState::InFlight,
TransferState::Completed,
TransferState::Failed,
] {
let s = state.as_str();
let parsed: TransferState = s.parse().unwrap();
assert_eq!(parsed, state);
}
}
#[test]
fn is_actionable() {
assert!(TransferState::Queued.is_actionable());
assert!(!TransferState::InFlight.is_actionable());
assert!(!TransferState::Completed.is_actionable());
assert!(!TransferState::Failed.is_actionable());
}
#[test]
fn serde_roundtrip() {
let mut t = sample_transfer();
t.start().unwrap();
t.complete().unwrap();
let json = serde_json::to_value(&t).unwrap();
let restored: Transfer = serde_json::from_value(json).unwrap();
assert_eq!(restored.state(), TransferState::Completed);
assert_eq!(restored.file_id(), "file-1");
}
#[test]
fn serde_roundtrip_failed_with_error_kind() {
let mut t = sample_transfer();
t.start().unwrap();
t.fail("net err".into(), TransferErrorKind::Transient)
.unwrap();
let json = serde_json::to_value(&t).unwrap();
let restored: Transfer = serde_json::from_value(json).unwrap();
assert_eq!(restored.state(), TransferState::Failed);
assert_eq!(restored.error_kind(), Some(TransferErrorKind::Transient));
}
#[test]
fn new_rejects_empty_file_id() {
let result = Transfer::new("".into(), loc("local"), loc("cloud"));
assert!(result.is_err());
}
#[test]
fn new_rejects_self_transfer() {
let result = Transfer::new("file-1".into(), loc("local"), loc("local"));
assert!(result.is_err());
}
#[test]
fn resolve_transitions_failed_to_completed() {
let mut t = failed_transient(3);
assert_eq!(t.state(), TransferState::Failed);
t.resolve().unwrap();
assert_eq!(t.state(), TransferState::Completed);
assert!(t.finished_at().is_some());
}
#[test]
fn resolve_from_queued_fails() {
let mut t = sample_transfer();
assert!(
t.resolve().is_err(),
"Queued → Completed (resolve) is invalid"
);
}
#[test]
fn resolve_from_inflight_fails() {
let mut t = sample_transfer();
t.start().unwrap();
assert!(
t.resolve().is_err(),
"InFlight → Completed (resolve) is invalid"
);
}
#[test]
fn resolve_from_completed_fails() {
let mut t = sample_transfer();
t.start().unwrap();
t.complete().unwrap();
assert!(
t.resolve().is_err(),
"Completed → Completed (resolve) is invalid"
);
}
#[test]
fn cancel_transitions_inflight_to_cancelled() {
let mut t = sample_transfer();
t.start().unwrap();
assert_eq!(t.state(), TransferState::InFlight);
t.cancel().unwrap();
assert_eq!(t.state(), TransferState::Cancelled);
assert!(t.finished_at().is_some());
}
#[test]
fn cancel_from_queued_fails() {
let mut t = sample_transfer();
assert!(t.cancel().is_err(), "Queued → Cancelled is invalid");
}
#[test]
fn cancel_from_completed_fails() {
let mut t = sample_transfer();
t.start().unwrap();
t.complete().unwrap();
assert!(t.cancel().is_err(), "Completed → Cancelled is invalid");
}
#[test]
fn cancel_from_failed_fails() {
let t = failed_transient(1);
let mut t = t;
assert!(t.cancel().is_err(), "Failed → Cancelled is invalid");
}
#[test]
fn cancelled_is_terminal() {
assert!(TransferState::Cancelled.is_terminal());
assert!(TransferState::Completed.is_terminal());
assert!(TransferState::Failed.is_terminal());
assert!(!TransferState::Queued.is_terminal());
assert!(!TransferState::InFlight.is_terminal());
assert!(!TransferState::Blocked.is_terminal());
}
#[test]
fn cancelled_is_not_actionable() {
assert!(!TransferState::Cancelled.is_actionable());
}
#[test]
fn cancelled_roundtrip_serde() {
let mut t = sample_transfer();
t.start().unwrap();
t.cancel().unwrap();
let json = serde_json::to_value(&t).unwrap();
let restored: Transfer = serde_json::from_value(json).unwrap();
assert_eq!(restored.state(), TransferState::Cancelled);
}
#[test]
fn cancelled_state_str_roundtrip() {
let s = TransferState::Cancelled.as_str();
assert_eq!(s, "cancelled");
let parsed: TransferState = s.parse().unwrap();
assert_eq!(parsed, TransferState::Cancelled);
}
}