use chrono::{DateTime, Utc};
use concepts::{
ComponentType, ExecutionId, FunctionFqn, JoinSetId,
component_id::ComponentDigest,
prefixed_ulid::{DeploymentId, ExecutorId, RunId},
storage::{
DbErrorGeneric, DbErrorWrite, DbErrorWriteNonRetriable, ExecutionWithState, LockedBy,
PendingState, PendingStateBlockedByJoinSet, PendingStateFinished,
PendingStateFinishedResultKind, PendingStateLocked, PendingStatePaused,
PendingStatePendingAt, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
STATE_PENDING_AT, Version,
},
};
use std::panic::Location;
use tracing::debug;
use tracing_error::SpanTrace;
#[derive(Debug)]
pub struct CombinedStateDTO {
pub execution_id: ExecutionId,
pub state: String,
pub ffqn: FunctionFqn,
pub component_digest: ComponentDigest,
pub component_type: ComponentType,
pub deployment_id: DeploymentId,
pub created_at: DateTime<Utc>,
pub first_scheduled_at: DateTime<Utc>,
pub pending_expires_finished: DateTime<Utc>,
pub is_paused: bool,
pub last_lock_version: Option<Version>,
pub executor_id: Option<ExecutorId>,
pub run_id: Option<RunId>,
pub join_set_id: Option<JoinSetId>,
pub join_set_closing: Option<bool>,
pub result_kind: Option<PendingStateFinishedResultKind>,
}
#[derive(Debug)]
pub struct CombinedState {
pub execution_with_state: ExecutionWithState,
pub corresponding_version: Version,
}
impl CombinedState {
#[must_use]
pub fn get_next_version_assert_not_finished(&self) -> Version {
assert!(!self.execution_with_state.pending_state.is_finished());
self.corresponding_version.increment()
}
pub fn get_next_version_fail_if_finished(&self) -> Result<Version, DbErrorWrite> {
if self.execution_with_state.pending_state.is_finished() {
debug!("Execution is already finished");
return Err(DbErrorWrite::NonRetriable(
DbErrorWriteNonRetriable::AlreadyFinished,
));
}
Ok(self.corresponding_version.increment())
}
#[must_use]
pub fn get_next_version_or_finished(&self) -> Version {
if self.execution_with_state.pending_state.is_finished() {
self.corresponding_version.clone()
} else {
self.corresponding_version.increment()
}
}
pub fn new(
dto: CombinedStateDTO,
corresponding_version: Version,
) -> Result<Self, DbErrorGeneric> {
let execution_with_state = match dto {
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: scheduled_at,
last_lock_version: None,
executor_id: None,
run_id: None,
join_set_id: None,
join_set_closing: None,
result_kind: None,
is_paused: false,
} if state == STATE_PENDING_AT => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::PendingAt(PendingStatePendingAt {
scheduled_at,
last_lock: None,
}),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: scheduled_at,
last_lock_version: None,
executor_id: Some(executor_id),
run_id: Some(run_id),
join_set_id: None,
join_set_closing: None,
result_kind: None,
is_paused: false,
} if state == STATE_PENDING_AT => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::PendingAt(PendingStatePendingAt {
scheduled_at,
last_lock: Some(LockedBy {
executor_id,
run_id,
}),
}),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: lock_expires_at,
last_lock_version: Some(_),
executor_id: Some(executor_id),
run_id: Some(run_id),
join_set_id: None,
join_set_closing: None,
result_kind: None,
is_paused: false,
} if state == STATE_LOCKED => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::Locked(PendingStateLocked {
locked_by: LockedBy {
executor_id,
run_id,
},
lock_expires_at,
}),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: lock_expires_at,
last_lock_version: None,
executor_id: _,
run_id: _,
join_set_id: Some(join_set_id),
join_set_closing: Some(join_set_closing),
result_kind: None,
is_paused: false,
} if state == STATE_BLOCKED_BY_JOIN_SET => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::BlockedByJoinSet(PendingStateBlockedByJoinSet {
join_set_id: join_set_id.clone(),
closing: join_set_closing,
lock_expires_at,
}),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: finished_at,
last_lock_version: None,
executor_id: None,
run_id: None,
join_set_id: None,
join_set_closing: None,
result_kind: Some(result_kind),
is_paused: false,
} if state == STATE_FINISHED => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::Finished(PendingStateFinished {
finished_at,
version: corresponding_version.0,
result_kind,
}),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: scheduled_at,
last_lock_version: None,
executor_id: None,
run_id: None,
join_set_id: None,
join_set_closing: None,
result_kind: None,
is_paused: true,
} if state == STATE_PENDING_AT => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::Paused(PendingStatePaused::PendingAt(
PendingStatePendingAt {
scheduled_at,
last_lock: None,
},
)),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: scheduled_at,
last_lock_version: None,
executor_id: Some(executor_id),
run_id: Some(run_id),
join_set_id: None,
join_set_closing: None,
result_kind: None,
is_paused: true,
} if state == STATE_PENDING_AT => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::Paused(PendingStatePaused::PendingAt(
PendingStatePendingAt {
scheduled_at,
last_lock: Some(LockedBy {
executor_id,
run_id,
}),
},
)),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: lock_expires_at,
last_lock_version: Some(_),
executor_id: Some(executor_id),
run_id: Some(run_id),
join_set_id: None,
join_set_closing: None,
result_kind: None,
is_paused: true,
} if state == STATE_LOCKED => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::Paused(PendingStatePaused::Locked(
PendingStateLocked {
locked_by: LockedBy {
executor_id,
run_id,
},
lock_expires_at,
},
)),
},
CombinedStateDTO {
execution_id,
created_at,
first_scheduled_at,
state,
ffqn,
component_digest,
component_type,
deployment_id,
pending_expires_finished: lock_expires_at,
last_lock_version: None,
executor_id: _,
run_id: _,
join_set_id: Some(join_set_id),
join_set_closing: Some(join_set_closing),
result_kind: None,
is_paused: true,
} if state == STATE_BLOCKED_BY_JOIN_SET => ExecutionWithState {
component_digest,
component_type,
deployment_id,
execution_id,
ffqn,
created_at,
first_scheduled_at,
pending_state: PendingState::Paused(PendingStatePaused::BlockedByJoinSet(
PendingStateBlockedByJoinSet {
join_set_id: join_set_id.clone(),
closing: join_set_closing,
lock_expires_at,
},
)),
},
dto => {
tracing::error!("Cannot deserialize pending state from {dto:?}");
return Err(DbErrorGeneric::Uncategorized {
reason: "invalid `t_state`".into(),
context: SpanTrace::capture(),
source: None,
loc: Location::caller(),
});
}
};
Ok(Self {
execution_with_state,
corresponding_version,
})
}
}