use std::sync::Mutex;
use std::time::Instant;
use prost::Message;
use crate::broker::capabilities::CAP_HANDLE_PASSING;
use crate::broker::client::connect_local_socket;
use crate::broker::protocol::{
hello_reply::Result as HelloReplyResult, write_frame, HandoffAck, HelloReply, Negotiated,
};
use super::backend_registry::BackendRegistry;
use super::handoff::{
handoff_ready_frame, HandoffAckRegistry, HandoffToken, HandoffTokenStore,
PendingHandoffBackend, WireHandoffDelivery, HANDOFF_TOKEN_BYTES,
};
use super::instance::BrokerInstanceKey;
pub struct ServeHandoffContext<'a> {
pub handoff_endpoint: &'a str,
pub service_name: &'a str,
pub service_version: &'a str,
pub instance: &'a BrokerInstanceKey,
pub registry: &'a Mutex<BackendRegistry>,
}
pub fn complete_negotiated_handoff(
ctx: &ServeHandoffContext<'_>,
client_stream: &mut interprocess::local_socket::Stream,
reply: &HelloReply,
) {
let Some(negotiated) = negotiated_with_handoff(reply) else {
return;
};
let Ok(token_bytes) =
<[u8; HANDOFF_TOKEN_BYTES]>::try_from(negotiated.handle_passed_token.as_slice())
else {
return;
};
let now = Instant::now();
let mut tokens = HandoffTokenStore::new();
let mut acks = HandoffAckRegistry::new();
let issued = match tokens.issue_with_random128(now, || Ok(token_bytes)) {
Ok(issued) => issued,
Err(error) => {
log_handoff_fallback(&format!("failed to re-seed issued token: {error}"));
return;
}
};
acks.register(
issued,
PendingHandoffBackend::for_service(ctx.service_name),
now,
);
let backend_stream = match connect_local_socket(ctx.handoff_endpoint) {
Ok(stream) => stream,
Err(error) => {
acks.abandon(&mut tokens, &issued);
log_handoff_fallback(&format!(
"failed to dial backend handoff endpoint {}: {error}",
ctx.handoff_endpoint
));
return;
}
};
{
use interprocess::local_socket::traits::Stream as _;
let _ = backend_stream.set_recv_timeout(Some(acks.ack_deadline()));
}
let mut delivery =
WireHandoffDelivery::new(backend_stream, ctx.service_name, negotiated.connection_id);
if !run_platform_handoff(
ctx,
&*client_stream,
issued,
&mut tokens,
&mut acks,
&mut delivery,
) {
return;
}
let ack = HandoffAck {
token: token_bytes.to_vec(),
accepted: true,
error_detail: String::new(),
correlation_id: negotiated.connection_id,
};
let frame = handoff_ready_frame(&ack);
if let Err(error) = write_frame(client_stream, &frame.encode_to_vec()) {
log_handoff_fallback(&format!(
"completed handoff but failed to relay handoff-ready event to client: {error}"
));
}
}
fn negotiated_with_handoff(reply: &HelloReply) -> Option<&Negotiated> {
let HelloReplyResult::Negotiated(negotiated) = reply.result.as_ref()? else {
return None;
};
if negotiated.server_capabilities & CAP_HANDLE_PASSING == 0
|| negotiated.handle_passed_token.is_empty()
{
return None;
}
Some(negotiated)
}
#[cfg(windows)]
fn run_platform_handoff(
ctx: &ServeHandoffContext<'_>,
client_stream: &interprocess::local_socket::Stream,
issued: HandoffToken,
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
delivery: &mut WireHandoffDelivery<interprocess::local_socket::Stream>,
) -> bool {
use std::os::windows::io::{AsHandle, AsRawHandle};
use super::handoff::{
execute_verified_windows_handoff, WindowsHandleValue, WindowsHandoffOutcome,
};
let pipe_handle = match client_stream {
interprocess::local_socket::Stream::NamedPipe(stream) => {
WindowsHandleValue::new(stream.as_handle().as_raw_handle() as usize)
}
};
let registry = ctx
.registry
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let Some(backend) = registry.get(ctx.instance, ctx.service_name, ctx.service_version) else {
acks.abandon(tokens, &issued);
log_handoff_fallback("registered backend disappeared before handoff delivery");
return false;
};
match execute_verified_windows_handoff(backend, pipe_handle, issued, tokens, acks, delivery) {
WindowsHandoffOutcome::Completed(_) => true,
WindowsHandoffOutcome::FallbackToReconnect(fallback) => {
let leak = match fallback.leaked_backend_handle {
Some(handle) => format!(
"; duplicated handle {:#x} leaks in backend pid {} until it exits",
handle.get(),
backend.daemon_process.pid
),
None => String::new(),
};
log_handoff_fallback(&format!(
"abandoned at {:?} stage: {}{leak}",
fallback.stage, fallback.detail
));
false
}
}
}
#[cfg(unix)]
fn run_platform_handoff(
ctx: &ServeHandoffContext<'_>,
client_stream: &interprocess::local_socket::Stream,
issued: HandoffToken,
tokens: &mut HandoffTokenStore,
acks: &mut HandoffAckRegistry,
delivery: &mut WireHandoffDelivery<interprocess::local_socket::Stream>,
) -> bool {
use std::cell::RefCell;
use std::os::fd::{AsFd, AsRawFd};
use std::time::Instant;
use super::handoff::{
execute_unix_handoff_with_transport, try_send_scm_rights_over, HandoffDelivery,
HandoffDeliveryError, ScmRightsAttempt, ScmRightsError, ScmRightsResult,
UnixFileDescriptor, UnixHandoffAckWait, UnixHandoffOutcome, UnixHandoffRequest,
UnixHandoffSocket, WindowsHandleValue,
};
let client_fd = match client_stream {
interprocess::local_socket::Stream::UdSocket(stream) => stream.as_fd().as_raw_fd(),
};
let backend_fd = match delivery.stream() {
interprocess::local_socket::Stream::UdSocket(stream) => stream.as_fd().as_raw_fd(),
};
let request = UnixHandoffRequest::new(
UnixFileDescriptor::new(client_fd),
UnixHandoffSocket::new(ctx.handoff_endpoint),
issued,
);
let delivery = RefCell::new(delivery);
let transport = |attempt: &ScmRightsAttempt| -> ScmRightsResult {
let mut delivery = delivery.borrow_mut();
let sent = try_send_scm_rights_over(backend_fd, attempt)?;
delivery
.deliver(WindowsHandleValue::new(0), &attempt.handoff_token)
.map_err(|error| {
log_handoff_fallback(&format!("failed to write HandoffOffer frame: {error}"));
ScmRightsError::SendFailed {
fd: attempt.fd.raw(),
socket: attempt.backend_socket.path.clone(),
raw_os_error: None,
}
})?;
Ok(sent)
};
struct DeliveryAckWait<'a, 'b> {
delivery: &'a RefCell<&'b mut WireHandoffDelivery<interprocess::local_socket::Stream>>,
}
impl UnixHandoffAckWait for DeliveryAckWait<'_, '_> {
fn await_backend_ack(
&mut self,
token: &HandoffToken,
deadline: Instant,
) -> Result<Instant, HandoffDeliveryError> {
self.delivery
.borrow_mut()
.await_backend_ack(token, deadline)
}
}
let mut ack_wait = DeliveryAckWait {
delivery: &delivery,
};
match execute_unix_handoff_with_transport(tokens, acks, &request, transport, &mut ack_wait) {
UnixHandoffOutcome::Completed(_) => true,
UnixHandoffOutcome::FallbackToReconnect(fallback) => {
let reached = if fallback.fd_reached_backend {
"; a duplicated descriptor already reached the backend and lives until it closes it"
} else {
""
};
log_handoff_fallback(&format!(
"abandoned at {:?} stage: {}{reached}",
fallback.stage, fallback.detail
));
false
}
}
}
fn log_handoff_fallback(detail: &str) {
eprintln!("running-process-broker: handoff fallback: {detail}");
}