#![doc = include_str!("../README.md")]
#[cfg(feature = "tokio_ecdysis")]
pub mod tokio_ecdysis;
mod executioner;
mod inheriter;
mod listener;
mod registry;
#[cfg(target_os = "linux")]
#[cfg(feature = "tokio_ecdysis")]
mod seqpacket;
mod utils;
use std::{
io,
io::Write,
net::{SocketAddr, TcpListener, UdpSocket},
os::unix::{
io::{AsRawFd, FromRawFd},
net::{UnixDatagram, UnixListener},
},
path::{Path, PathBuf},
process::exit,
};
use socket2::Socket;
#[cfg(target_os = "linux")]
#[cfg(feature = "tokio_ecdysis")]
pub use tokio_seqpacket::UnixSeqpacketListener;
use executioner::{upgrade, UpgradeFinished};
use inheriter::{init_child, InheritError};
use registry::{ListenerRegistry, SockInfo};
pub use crate::{listener::Listener, registry::ListenerInfo};
#[cfg(feature = "systemd_notify")]
pub use tokio_ecdysis::systemd_notify::{SystemdNotifier, SystemdNotifierError};
#[cfg(feature = "systemd_sockets")]
pub use tokio_ecdysis::systemd_sockets::{SystemdSocketError, SystemdSocketsReadError};
pub struct Ecdysis {
registry: ListenerRegistry,
ready_notifier: Option<os_pipe::PipeWriter>,
pid_file: Option<PathBuf>,
child: bool,
}
impl Default for Ecdysis {
fn default() -> Self {
Self::new()
}
}
impl Ecdysis {
pub fn new() -> Self {
let registry: ListenerRegistry;
let ready_notifier: Option<os_pipe::PipeWriter>;
let child: bool;
match init_child() {
Ok((fds, ready_pipe)) => {
registry = ListenerRegistry::from_inherited(fds);
ready_notifier = Some(ready_pipe);
child = true;
}
Err(InheritError::NotAnUpgrade) => {
registry = ListenerRegistry::new();
ready_notifier = None;
child = false;
}
Err(e) => {
log::error!("Fatal: Upgrade environment problem - {e}");
exit(1)
}
};
Self {
registry,
ready_notifier,
child,
pid_file: None,
}
}
pub fn is_child(&self) -> bool {
self.child
}
pub fn set_pid_file<P: AsRef<Path>>(&mut self, pid_file: P) {
self.pid_file = Some(PathBuf::from(pid_file.as_ref()))
}
fn write_pidfile(&self) -> io::Result<()> {
match &self.pid_file {
Some(pid_file) => utils::write_pid_file(pid_file),
None => Ok(()),
}
}
pub fn ready(&mut self) -> io::Result<()> {
self.registry.close_inherited();
let _ = self.write_pidfile();
if let Some(mut notifier) = self.ready_notifier.take() {
notifier.write_all(b"OK")
} else {
Ok(())
}
}
pub fn upgrade(&self) -> UpgradeFinished {
let fds = self.registry.get_fds_for_child();
log::warn!("Ecdysis starting upgrade");
upgrade(fds).map_err(|e| {
log::warn!("Upgrade failed! - {e}");
let _ = self.write_pidfile();
e
})
}
pub fn quit(&self) {
self.registry.close_used()
}
pub fn listen_unix<P>(&self, path: P) -> io::Result<UnixListener>
where
P: AsRef<Path> + std::fmt::Debug,
{
log::debug!("Creating unix listener at: {:?}", path);
let listener = match self
.registry
.inherit(SockInfo::Unix(Some(path.as_ref().into())))
{
Some(fd) => {
log::debug!("Found existing fd, opening");
unsafe { UnixListener::from_raw_fd(fd) }
}
None => {
log::debug!("Does not exist, creating new");
let listener = UnixListener::bind(path)?;
self.registry.add(listener.info()?)?;
listener
}
};
Ok(listener)
}
#[cfg(target_os = "linux")]
#[cfg(feature = "tokio_ecdysis")]
fn listen_unix_seqpacket<P>(&self, path: P) -> io::Result<UnixSeqpacketListener>
where
P: AsRef<Path> + std::fmt::Debug,
{
log::debug!("Creating unix seqpacket listener at: {:?}", path);
let listener = match self
.registry
.inherit(SockInfo::UnixSeqpacket(Some(path.as_ref().into())))
{
Some(fd) => {
log::debug!("Found existing fd, opening");
unsafe { UnixSeqpacketListener::from_raw_fd(fd) }?
}
None => {
log::debug!("Does not exist, creating new");
let listener = UnixSeqpacketListener::bind(path)?;
self.registry.add(listener.info()?)?;
listener
}
};
Ok(listener)
}
pub fn listen_tcp(&self, addr: SocketAddr) -> io::Result<TcpListener> {
self.build_listen_tcp(addr, |b, addr| {
b.bind(&addr.into())?;
b.listen(128)?;
Ok(b.into())
})
}
pub fn build_listen_tcp<F>(&self, addr: SocketAddr, sock_build: F) -> io::Result<TcpListener>
where
F: FnOnce(Socket, SocketAddr) -> io::Result<TcpListener>,
{
log::debug!("Creating TCP listener on: {:?}", addr);
let listener = match self.registry.inherit(SockInfo::Tcp(addr)) {
Some(fd) => {
log::debug!("Found existing TCP fd, opening");
unsafe { TcpListener::from_raw_fd(fd) }
}
None => {
log::debug!("TCP fd does not exist, creating new");
let builder = Socket::new(
socket2::Domain::for_address(addr),
socket2::Type::STREAM,
None,
)?;
let listener = sock_build(builder, addr)?;
self.registry.add(listener.info()?)?;
listener
}
};
log::debug!(
"set up listener, now have registry of:\n {:?}",
self.registry.get_fds_for_child()
);
Ok(listener)
}
pub fn build_socket_udp<F>(&self, addr: SocketAddr, sock_build: F) -> io::Result<UdpSocket>
where
F: FnOnce(Socket, SocketAddr) -> io::Result<UdpSocket>,
{
log::debug!("Creating UDP socket on: {:?}", addr);
let socket = match self.registry.inherit(SockInfo::Udp(addr)) {
Some(fd) => {
log::debug!("Found existing UDP fd, opening");
unsafe { UdpSocket::from_raw_fd(fd) }
}
None => {
log::debug!("UDP fd does not exist, creating new");
let builder = Socket::new(
socket2::Domain::for_address(addr),
socket2::Type::DGRAM,
None,
)?;
let socket = sock_build(builder, addr)?;
self.registry.add(socket.info()?)?;
socket
}
};
log::debug!(
"set up socket, now have registry of:\n {:?}",
self.registry.get_fds_for_child()
);
Ok(socket)
}
pub fn unix_datagram_pair(
&self,
name: String,
) -> (Option<UnixDatagram>, io::Result<UnixDatagram>) {
let sock_info = SockInfo::UnboundUnixDatagram(name);
let unix_datagram_to_parent_option = self
.registry
.take(&sock_info)
.map(|fd| unsafe { UnixDatagram::from_raw_fd(fd) });
let unix_datagram_to_child_result =
UnixDatagram::pair().and_then(|(unix_datagram_to_child, childs_unix_datagram)| {
self.registry.add(ListenerInfo {
fd: childs_unix_datagram.as_raw_fd(),
sock_info,
})?;
Ok(unix_datagram_to_child)
});
(
unix_datagram_to_parent_option,
unix_datagram_to_child_result,
)
}
}