use super::scheduler::{DISCONNECTED_WARNING, SchedulingDecision};
use super::*;
use tokio::sync::oneshot;
pub type LayerName = String;
pub type LayerIndex = u32;
pub type Iteration = u64;
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum RequestType {
Scheduled,
Immediate,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum TransferType {
Load,
Store,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum SchedulerRequirement {
IterationComplete(Iteration),
LayerNameComplete(LayerName, Iteration),
LayerComplete(LayerIndex, Iteration),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeaderTransferRequest {
pub request_id: String,
pub uuid: uuid::Uuid,
pub requirement: Option<SchedulerRequirement>,
pub request_type: RequestType,
}
pub enum TransferToSchedulerMessage {
ScheduleRequest(TransferScheduleRequest),
ImmediateResult(ImmediateTransferResult),
}
pub struct TransferScheduleRequest {
pub leader_request: LeaderTransferRequest,
pub response_tx: oneshot::Sender<ScheduledTaskHandle>,
}
pub struct ScheduledTaskHandle {
pub decision_rx: oneshot::Receiver<(SchedulingDecision, oneshot::Sender<anyhow::Result<()>>)>,
pub cancel_token: CancellationToken,
}
impl ScheduledTaskHandle {
pub async fn wait_for_decision(self) -> Box<dyn TransferCompletionHandle> {
tokio::select! {
Ok((decision, completion_tx)) = self.decision_rx => {
Box::new(ScheduledTransferCompletionHandle::new(decision, completion_tx))
}
_ = self.cancel_token.cancelled() => {
Box::new(CancelledTransferCompletionHandle)
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerTransferRequest {
pub request_id: String,
pub uuid: uuid::Uuid,
pub transfer_type: TransferType,
pub request_type: RequestType,
}
pub struct WorkerSchedulerRequest {
pub request_id: String,
pub uuid: uuid::Uuid,
pub transfer_type: TransferType,
pub cancel_token: CancellationToken,
}
#[async_trait::async_trait]
pub trait TransferCompletionHandle: Send {
fn scheduler_decision(&self) -> SchedulingDecision;
async fn mark_complete(&self, result: anyhow::Result<()>);
}
pub struct ScheduledTransferCompletionHandle {
scheduler_decision: SchedulingDecision,
completion_tx: Mutex<Option<oneshot::Sender<anyhow::Result<()>>>>,
}
impl ScheduledTransferCompletionHandle {
pub(crate) fn new(
scheduler_decision: SchedulingDecision,
completion_tx: oneshot::Sender<anyhow::Result<()>>,
) -> Self {
Self {
scheduler_decision,
completion_tx: Mutex::new(Some(completion_tx)),
}
}
}
#[async_trait::async_trait]
impl TransferCompletionHandle for ScheduledTransferCompletionHandle {
fn scheduler_decision(&self) -> SchedulingDecision {
self.scheduler_decision
}
async fn mark_complete(&self, result: anyhow::Result<()>) {
if let Some(completion_tx) = self.completion_tx.lock().unwrap().take()
&& completion_tx.send(result).is_err()
{
tracing::error!(
"failed to send completion status; this could lead to silent data corruption"
);
}
}
}
impl Drop for ScheduledTransferCompletionHandle {
fn drop(&mut self) {
if self.completion_tx.lock().unwrap().is_some() {
panic!(concat!(
"logic error: implementation failed to respect the [TransferCompletionHandle] policy; ",
"handle dropped without being explicitly marked; this may lead to data corruption if ",
"the handle was dropped while a transfer was still in progress; please report immediately.",
));
}
}
}
pub struct ImmediateTransferResult {
pub request_id: String,
pub uuid: uuid::Uuid,
pub status: anyhow::Result<()>,
}
pub struct ImmediateTransferCompletionHandle {
request_id: String,
uuid: uuid::Uuid,
completion_tx: Mutex<Option<tokio::sync::mpsc::Sender<TransferToSchedulerMessage>>>,
}
impl ImmediateTransferCompletionHandle {
pub(crate) fn new(
request_id: String,
uuid: uuid::Uuid,
completion_tx: tokio::sync::mpsc::Sender<TransferToSchedulerMessage>,
) -> Self {
Self {
request_id,
uuid,
completion_tx: Mutex::new(Some(completion_tx)),
}
}
}
#[async_trait::async_trait]
impl TransferCompletionHandle for ImmediateTransferCompletionHandle {
fn scheduler_decision(&self) -> SchedulingDecision {
SchedulingDecision::Execute
}
async fn mark_complete(&self, result: anyhow::Result<()>) {
let completion_tx = {
let mut guard = self.completion_tx.lock().unwrap();
guard.take()
};
if let Some(completion_tx) = completion_tx
&& completion_tx
.send(TransferToSchedulerMessage::ImmediateResult(
ImmediateTransferResult {
request_id: self.request_id.clone(),
uuid: self.uuid,
status: result,
},
))
.await
.is_err()
{
tracing::error!(DISCONNECTED_WARNING);
}
}
}
impl Drop for ImmediateTransferCompletionHandle {
fn drop(&mut self) {
if self.completion_tx.lock().unwrap().is_some() {
panic!(concat!(
"logic error: implementation failed to respect the [TransferCompletionHandle] policy; ",
"handle dropped without being explicitly marked; this may lead to data corruption if ",
"the handle was dropped while a transfer was still in progress; please report immediately.",
));
}
}
}
pub struct CancelledTransferCompletionHandle;
#[async_trait::async_trait]
impl TransferCompletionHandle for CancelledTransferCompletionHandle {
fn scheduler_decision(&self) -> SchedulingDecision {
SchedulingDecision::Cancel
}
async fn mark_complete(&self, _result: anyhow::Result<()>) {
}
}