use std::time::Instant;
use super::ack::HandoffAckRegistry;
use super::fallback::{HandoffFallbackDecision, HandoffFallbackReason};
use super::handoff_token::{HandoffToken, HandoffTokenStore};
use super::orchestrate::HandoffDeliveryError;
use super::unix::{
try_send_scm_rights, ScmRightsAttempt, ScmRightsResult, ScmRightsSuccess, UnixFileDescriptor,
UnixHandoffSocket,
};
use super::AcknowledgedHandoff;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UnixHandoffRequest {
pub fd: UnixFileDescriptor,
pub backend_socket: UnixHandoffSocket,
pub token: HandoffToken,
}
impl UnixHandoffRequest {
pub fn new(
fd: UnixFileDescriptor,
backend_socket: UnixHandoffSocket,
token: HandoffToken,
) -> Self {
Self {
fd,
backend_socket,
token,
}
}
}
pub trait UnixHandoffAckWait {
fn await_backend_ack(
&mut self,
token: &HandoffToken,
deadline: Instant,
) -> Result<Instant, HandoffDeliveryError>;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum UnixHandoffStage {
Send,
AwaitAck,
Acknowledge,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CompletedUnixHandoff {
pub sent: ScmRightsSuccess,
pub acknowledged: AcknowledgedHandoff,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UnixHandoffFallback {
pub stage: UnixHandoffStage,
pub decision: HandoffFallbackDecision,
pub broker_fd: UnixFileDescriptor,
pub fd_reached_backend: bool,
pub detail: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum UnixHandoffOutcome {
Completed(CompletedUnixHandoff),
FallbackToReconnect(UnixHandoffFallback),
}
impl UnixHandoffOutcome {
pub fn is_completed(&self) -> bool {
matches!(self, Self::Completed(_))
}
pub fn fallback(&self) -> Option<&UnixHandoffFallback> {
match self {
Self::Completed(_) => None,
Self::FallbackToReconnect(fallback) => Some(fallback),
}
}
}
pub fn execute_unix_handoff<W>(
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
request: &UnixHandoffRequest,
ack_wait: &mut W,
) -> UnixHandoffOutcome
where
W: UnixHandoffAckWait + ?Sized,
{
execute_unix_handoff_with_transport(tokens, acks, request, try_send_scm_rights, ack_wait)
}
pub fn execute_unix_handoff_with_transport<T, W>(
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
request: &UnixHandoffRequest,
transport: T,
ack_wait: &mut W,
) -> UnixHandoffOutcome
where
T: FnOnce(&ScmRightsAttempt) -> ScmRightsResult,
W: UnixHandoffAckWait + ?Sized,
{
let attempt = ScmRightsAttempt::new(request.fd, request.backend_socket.clone(), request.token);
let sent = match transport(&attempt) {
Ok(success) => success,
Err(error) => {
acks.abandon(tokens, &request.token);
return abandoned(
UnixHandoffStage::Send,
error.fallback_decision(),
request.fd,
false,
error.to_string(),
);
}
};
let deadline = ack_deadline_from(acks, Instant::now());
let acknowledged_at = match ack_wait.await_backend_ack(&request.token, deadline) {
Ok(at) => at,
Err(error) => {
acks.abandon(tokens, &request.token);
return abandoned(
UnixHandoffStage::AwaitAck,
HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout),
request.fd,
true,
error.to_string(),
);
}
};
match acks.acknowledge(tokens, &request.token, acknowledged_at) {
Ok(acknowledged) => {
UnixHandoffOutcome::Completed(CompletedUnixHandoff { sent, acknowledged })
}
Err(error) => {
tokens.revoke(&request.token);
abandoned(
UnixHandoffStage::Acknowledge,
HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout),
request.fd,
true,
error.to_string(),
)
}
}
}
fn abandoned(
stage: UnixHandoffStage,
decision: HandoffFallbackDecision,
broker_fd: UnixFileDescriptor,
fd_reached_backend: bool,
detail: String,
) -> UnixHandoffOutcome {
UnixHandoffOutcome::FallbackToReconnect(UnixHandoffFallback {
stage,
decision,
broker_fd,
fd_reached_backend,
detail,
})
}
fn ack_deadline_from(acks: &HandoffAckRegistry, now: Instant) -> Instant {
now.checked_add(acks.ack_deadline()).unwrap_or(now)
}