use std::time::Instant;
use super::ack::{HandoffAckRegistry, PendingHandoffBackend};
use super::fallback::{HandoffFallbackDecision, HandoffFallbackReason};
use super::handoff_token::{HandoffToken, HandoffTokenStore};
use super::windows::{
try_duplicate_handle, DuplicateHandleAttempt, DuplicateHandleResult, DuplicateHandleSuccess,
WindowsHandleValue,
};
use super::AcknowledgedHandoff;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct WindowsHandoffRequest {
pub pipe_handle: WindowsHandleValue,
pub backend_pid: u32,
pub token: HandoffToken,
}
impl WindowsHandoffRequest {
pub fn new(pipe_handle: WindowsHandleValue, backend_pid: u32, token: HandoffToken) -> Self {
Self {
pipe_handle,
backend_pid,
token,
}
}
}
pub trait HandoffDelivery {
fn deliver(
&mut self,
handle: WindowsHandleValue,
token: &HandoffToken,
) -> Result<(), HandoffDeliveryError>;
fn await_backend_ack(
&mut self,
token: &HandoffToken,
deadline: Instant,
) -> Result<Instant, HandoffDeliveryError>;
}
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum HandoffDeliveryError {
#[error("handoff delivery to backend failed: {detail}")]
DeliveryFailed {
detail: String,
},
#[error("backend handoff ACK was not observed: {detail}")]
AckNotObserved {
detail: String,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowsHandoffStage {
Duplicate,
Deliver,
AwaitAck,
Acknowledge,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CompletedWindowsHandoff {
pub duplicated: DuplicateHandleSuccess,
pub acknowledged: AcknowledgedHandoff,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WindowsHandoffFallback {
pub stage: WindowsHandoffStage,
pub decision: HandoffFallbackDecision,
pub leaked_backend_handle: Option<WindowsHandleValue>,
pub detail: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WindowsHandoffOutcome {
Completed(CompletedWindowsHandoff),
FallbackToReconnect(WindowsHandoffFallback),
}
impl WindowsHandoffOutcome {
pub fn is_completed(&self) -> bool {
matches!(self, Self::Completed(_))
}
pub fn fallback(&self) -> Option<&WindowsHandoffFallback> {
match self {
Self::Completed(_) => None,
Self::FallbackToReconnect(fallback) => Some(fallback),
}
}
}
pub fn execute_windows_handoff<D>(
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
request: &WindowsHandoffRequest,
delivery: &mut D,
) -> WindowsHandoffOutcome
where
D: HandoffDelivery + ?Sized,
{
execute_windows_handoff_with_transport(tokens, acks, request, try_duplicate_handle, delivery)
}
#[cfg(windows)]
pub fn execute_verified_windows_handoff<D>(
backend: &crate::broker::backend_handle::BackendHandle,
pipe_handle: WindowsHandleValue,
token: HandoffToken,
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
delivery: &mut D,
) -> WindowsHandoffOutcome
where
D: HandoffDelivery + ?Sized,
{
let request = WindowsHandoffRequest::new(pipe_handle, backend.daemon_process.pid, token);
execute_windows_handoff_with_transport(
tokens,
acks,
&request,
|attempt| {
backend.try_duplicate_windows_handoff_handle(attempt.pipe_handle, attempt.handoff_token)
},
delivery,
)
}
pub fn execute_windows_handoff_with_transport<T, D>(
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
request: &WindowsHandoffRequest,
transport: T,
delivery: &mut D,
) -> WindowsHandoffOutcome
where
T: FnOnce(&DuplicateHandleAttempt) -> DuplicateHandleResult,
D: HandoffDelivery + ?Sized,
{
let attempt =
DuplicateHandleAttempt::new(request.pipe_handle, request.backend_pid, request.token);
let duplicated = match transport(&attempt) {
Ok(success) => success,
Err(error) => {
abandon_pending(acks, tokens, &request.token);
return abandoned(
WindowsHandoffStage::Duplicate,
error.fallback_decision(),
None,
error.to_string(),
);
}
};
let backend_handle = duplicated.duplicated_handle;
if let Err(error) = delivery.deliver(backend_handle, &request.token) {
abandon_pending(acks, tokens, &request.token);
return abandoned(
WindowsHandoffStage::Deliver,
HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout),
Some(backend_handle),
error.to_string(),
);
}
let deadline = ack_deadline_from(acks, Instant::now());
let acknowledged_at = match delivery.await_backend_ack(&request.token, deadline) {
Ok(at) => at,
Err(error) => {
abandon_pending(acks, tokens, &request.token);
return abandoned(
WindowsHandoffStage::AwaitAck,
HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout),
Some(backend_handle),
error.to_string(),
);
}
};
match acks.acknowledge(tokens, &request.token, acknowledged_at) {
Ok(acknowledged) => WindowsHandoffOutcome::Completed(CompletedWindowsHandoff {
duplicated,
acknowledged,
}),
Err(error) => {
tokens.revoke(&request.token);
abandoned(
WindowsHandoffStage::Acknowledge,
HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout),
Some(backend_handle),
error.to_string(),
)
}
}
}
fn abandon_pending(
acks: &mut HandoffAckRegistry,
tokens: &mut HandoffTokenStore,
token: &HandoffToken,
) -> Option<PendingHandoffBackend> {
acks.abandon(tokens, token)
}
fn abandoned(
stage: WindowsHandoffStage,
decision: HandoffFallbackDecision,
leaked_backend_handle: Option<WindowsHandleValue>,
detail: String,
) -> WindowsHandoffOutcome {
WindowsHandoffOutcome::FallbackToReconnect(WindowsHandoffFallback {
stage,
decision,
leaked_backend_handle,
detail,
})
}
fn ack_deadline_from(acks: &HandoffAckRegistry, now: Instant) -> Instant {
now.checked_add(acks.ack_deadline()).unwrap_or(now)
}