#![doc = include_str!("README.md")]
pub use blocking;
pub use chain_trans;
mod cleanable_path;
pub mod mapfut;
pub mod socket_shims;
pub mod timefut;
pub mod liveness {
use std::{
path::{Path, PathBuf},
process::Command,
};
pub const LIVENESS_ENV_VAR: &str = "SUSS_LIVENESS_SOCKET_PATH";
pub fn set_liveness_environment<'c>(
command: &'c mut Command,
child_liveness_path_state: Option<&Path>,
) -> &'c mut Command {
match child_liveness_path_state {
Some(liveness_path) => command.env(LIVENESS_ENV_VAR, liveness_path.as_os_str()),
None => command.env_remove(LIVENESS_ENV_VAR),
}
}
pub fn retrieve_liveness_path() -> Option<PathBuf> {
let path = std::env::var_os(LIVENESS_ENV_VAR).map(PathBuf::from);
std::env::remove_var(LIVENESS_ENV_VAR);
path
}
}
pub use async_trait::async_trait;
use chain_trans::Trans;
use cleanable_path::CleanablePathBuf;
pub use futures_lite::future;
pub use socket_shims::UnixSocketInterface;
use std::{
ffi::{OsStr, OsString},
fmt::Debug,
marker::PhantomData,
path::Path,
};
use std::{io::Result as IoResult, process::Child, time::Duration};
use timefut::with_timeout;
use tracing::{debug, error, info, instrument, warn};
#[async_trait(?Send)]
pub trait Service<UnixSockets: UnixSocketInterface>: Debug {
type ServiceClientConnection;
fn socket_name(&self) -> &std::ffi::OsStr;
async fn wrap_connection(
&self,
bare_stream: UnixSockets::UnixStream,
) -> IoResult<Self::ServiceClientConnection>
where
UnixSockets::UnixStream: 'async_trait;
}
#[async_trait(?Send)]
pub trait ServiceStartable<U: UnixSocketInterface>: Service<U> {
fn run_service_command_raw(
&self,
executor_commandline_prefix: Option<&[impl AsRef<OsStr> + Sized + Debug]>,
liveness_path: Option<&Path>,
) -> IoResult<Child>;
async fn after_post_liveness_subprocess(&self, _: Child) -> IoResult<()> {
Ok(())
}
}
fn get_random_sockpath() -> std::path::PathBuf {
use nanorand::rand::{chacha::ChaCha20, Rng};
let mut path = std::env::temp_dir();
let mut gen = ChaCha20::new();
path.push(format!("temp-{:016x}.sock", gen.generate::<u64>()));
path
}
#[instrument]
async fn ephemeral_liveness_socket_create<U: UnixSocketInterface>(
) -> IoResult<(U::UnixListener, CleanablePathBuf)> {
let ephemeral_socket_path = CleanablePathBuf::new(get_random_sockpath());
info!(
"Creating ephemeral liveness socket @ {}",
ephemeral_socket_path.as_ref().display()
);
U::unix_listener_bind(ephemeral_socket_path.as_ref())
.await
.map_err(|e| {
error!(
"Couldn't create ephemeral liveness socket @ {} - {}",
ephemeral_socket_path.as_ref().display(),
e
);
e
})?
.trans(|ul| Ok((ul, ephemeral_socket_path)))
}
async fn ephemeral_liveness_socket_check_with_timeout<U: UnixSocketInterface>(
mut ephemeral_listener: U::UnixListener,
listener_path: CleanablePathBuf,
liveness_timeout: Duration,
) -> IoResult<()> {
let maybe_temp_unix_stream = with_timeout(
U::unix_listener_accept(&mut ephemeral_listener),
liveness_timeout,
)
.await;
let temp_unix_stream = maybe_temp_unix_stream.unwrap_or_else(|| {
Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"Timed out waiting for service to become live after {}",
humantime::format_duration(liveness_timeout)
),
))
});
let mut temp_unix_stream = temp_unix_stream
.map_err(|e| {
error!(
"Failed to receive liveness ping for service on ephemeral socket {} - {}",
listener_path.as_ref().display(),
e
);
e
})?
.trans(|(stream, _addr)| stream);
U::unix_stream_shutdown(&mut temp_unix_stream).await?;
drop(ephemeral_listener);
drop(listener_path);
Ok(())
}
#[async_trait(?Send)]
pub trait ServiceExt<UnixSockets: UnixSocketInterface>: Service<UnixSockets> {
fn reify(self, base_context_directory: &Path) -> ReifiedService<'_, Self, UnixSockets>
where
Self: Sized,
{
ReifiedService::reify_service(self, base_context_directory)
}
fn reify_with_executor<'i, EPC: AsRef<OsStr> + Sized + Debug>(
self,
base_context_directory: &'i Path,
executor_prefix: &'i [EPC],
) -> ReifiedService<'i, Self, UnixSockets, EPC>
where
Self: Sized,
{
ReifiedService::reify_service_with_executor(self, base_context_directory, executor_prefix)
}
#[instrument]
async fn connect_to_running_service(
&self,
base_context_directory: &Path,
) -> IoResult<Self::ServiceClientConnection> {
let server_socket_path = base_context_directory.join(Self::socket_name(self));
info!(
"Attempting connection to service @ {}",
server_socket_path.display()
);
let unix_stream = UnixSockets::unix_stream_connect(&server_socket_path)
.await
.map_err(|e| {
error!(
"Failed to connect to service @ {}",
server_socket_path.display()
);
e
})?;
info!("Successfully connected @ {}", server_socket_path.display());
self.wrap_connection(unix_stream).await
}
#[instrument]
async fn connect_to_service(
&self,
executor_commandline_prefix: Option<&[impl AsRef<OsStr> + Sized + Debug]>,
base_context_directory: &Path,
liveness_timeout: Duration,
) -> IoResult<Self::ServiceClientConnection>
where
Self: ServiceStartable<UnixSockets>,
{
match self
.connect_to_running_service(base_context_directory)
.await
{
Ok(s) => Ok(s),
Err(e) => {
warn!("Error connecting to existing service - {} - attempting on-demand service start", e);
let (ephemeral_listener, ephemeral_socket_path) =
ephemeral_liveness_socket_create::<UnixSockets>().await?;
let child_proc = self
.run_service_command_raw(
executor_commandline_prefix,
Some(ephemeral_socket_path.as_ref()),
)
.map_err(|e| {
error!("Could not start child service process - {}", e);
e
})?;
ephemeral_liveness_socket_check_with_timeout::<UnixSockets>(
ephemeral_listener,
ephemeral_socket_path,
liveness_timeout,
)
.await?;
self.after_post_liveness_subprocess(child_proc).await?;
info!("Successfully received ephemeral liveness ping - trying to connect to service again.");
self.connect_to_running_service(base_context_directory)
.await
}
}
}
}
impl<U: UnixSocketInterface, S: Service<U>> ServiceExt<U> for S {}
#[async_trait]
pub trait Server<S: Service<U>, U: UnixSocketInterface>: Debug {
type ListenerWrapper;
type FinalOutput;
async fn wrap_listener_socket(
&self,
service: &S,
socket: U::UnixListener,
) -> IoResult<Self::ListenerWrapper>;
async fn run_server(
&self,
service: &S,
wrapper: Self::ListenerWrapper,
) -> IoResult<Self::FinalOutput>;
}
#[instrument]
async fn notify_liveness_socket<U: UnixSocketInterface>(
liveness_socket_path: &Path,
) -> IoResult<()> {
let mut sock = U::unix_stream_connect(liveness_socket_path).await
.map_err(|e| {
warn!("Couldn't connect to parent process's ephemeral liveness socket @ {} - error was: {}", liveness_socket_path.display(), e);
e
})?.trans_inspect(|_sock| {
info!(
"Ping'ed liveness socket @ {} with connection, shutting ephemeral connection.",
liveness_socket_path.display()
);
});
U::unix_stream_shutdown(&mut sock).await
}
#[async_trait(?Send)]
pub trait ServerExt<S: Service<U>, U: UnixSocketInterface>: Server<S, U> {
#[instrument]
async fn start_and_run_server(
&self,
service: &S,
context_base_path: &Path,
liveness_socket_path: Option<&Path>,
) -> IoResult<Self::FinalOutput> {
let socket_path: CleanablePathBuf = context_base_path.join(service.socket_name()).into();
info!("Obtaining socket @ {}", socket_path.as_ref().display());
let raw_listener_socket = U::unix_listener_bind(socket_path.as_ref()).await?;
info!(
"Successfully listening @ {}",
socket_path.as_ref().display()
);
let _ = match liveness_socket_path {
Some(p) => notify_liveness_socket::<U>(p).await,
None => {
info!("No liveness socket path provided, assuming autonomous.");
Ok(())
}
};
debug!("Wrapping raw socket in API");
let api = self
.wrap_listener_socket(service, raw_listener_socket)
.await?;
info!("Starting service @ {}", socket_path.as_ref().display());
let res = self.run_server(service, api).await?;
info!("Cleaning up socket @ {}", socket_path.as_ref().display());
drop(socket_path);
Ok(res)
}
}
impl<U: UnixSocketInterface, S: Service<U>, T: Server<S, U>> ServerExt<S, U> for T {}
pub struct ReifiedService<
'info,
S: Service<U>,
U: UnixSocketInterface,
ExecutorPrefixComponent: AsRef<OsStr> + Sized + Debug = OsString,
> {
executor_prefix: Option<&'info [ExecutorPrefixComponent]>,
base_context_directory: &'info Path,
bare_service: S,
_unix_socket_iface: PhantomData<U>,
}
impl<
'info,
S: Service<U>,
U: UnixSocketInterface,
ExecutorPrefixComponent: AsRef<OsStr> + Sized + Debug,
> Debug for ReifiedService<'info, S, U, ExecutorPrefixComponent>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReifiedService")
.field("executor_prefix", &self.executor_prefix)
.field("base_context_directory", &self.base_context_directory)
.field("bare_service", &self.bare_service)
.finish_non_exhaustive()
}
}
impl<
'info,
S: Service<U>,
U: UnixSocketInterface,
ExecutorPrefixComponent: AsRef<OsStr> + Sized + Debug,
> ReifiedService<'info, S, U, ExecutorPrefixComponent>
{
pub fn reify_service(service: S, base_context_directory: &'info Path) -> Self {
Self {
executor_prefix: None,
base_context_directory,
bare_service: service,
_unix_socket_iface: PhantomData,
}
}
pub fn reify_service_with_executor(
service: S,
base_context_directory: &'info Path,
executor_prefix: &'info [ExecutorPrefixComponent],
) -> Self {
Self {
executor_prefix: Some(executor_prefix),
base_context_directory,
bare_service: service,
_unix_socket_iface: PhantomData,
}
}
#[instrument]
pub async fn connect(&self, liveness_timeout: Duration) -> IoResult<S::ServiceClientConnection>
where
S: ServiceStartable<U>,
{
self.bare_service
.connect_to_service(
self.executor_prefix,
self.base_context_directory,
liveness_timeout,
)
.await
}
#[instrument]
pub async fn connect_to_running(&self) -> IoResult<S::ServiceClientConnection> {
self.bare_service
.connect_to_running_service(self.base_context_directory)
.await
}
#[instrument]
pub async fn serve_service_implementation<ServiceServer: ServerExt<S, U>>(
&self,
server: &ServiceServer,
liveness_socket_path: Option<&Path>,
) -> IoResult<ServiceServer::FinalOutput> {
server
.start_and_run_server(
&self.bare_service,
self.base_context_directory,
liveness_socket_path,
)
.await
}
}
pub trait ServiceBundle<ExecutorPrefixComponent: AsRef<OsStr> + Sized = OsString> {
fn new(base_context_directory: &Path) -> Self;
fn with_executor_prefix(
base_context_directory: &Path,
executor_prefix: &[ExecutorPrefixComponent],
) -> Self;
}
#[macro_export]
macro_rules! declare_service {
{
$(#[$service_meta:meta])*
$vis:vis $service_name:ident <$unix_sock_impl:ty> = {
$($command:literal $($args:literal)*)? @ $socket_name:literal
as $unix_stream_preprocess_method:ident $($unix_stream_preprocess_spec:tt)*
} $(impl {$($typeparam_constraints:tt)*})?
} => {
$(#[$service_meta])*
#[derive(Debug)]
$vis struct $service_name;
#[$crate::async_trait(?Send)]
impl $(<$($typeparam_constraints)*>)? $crate::Service <$unix_sock_impl> for $service_name {
type ServiceClientConnection = $crate::declare_service!(@socket_connection_type $unix_stream_preprocess_method $($unix_stream_preprocess_spec)*);
#[inline]
fn socket_name(&self) -> &::std::ffi::OsStr {
::std::ffi::OsStr::new($socket_name)
}
#[inline]
async fn wrap_connection(&self, bare_stream: <$unix_sock_impl as $crate::socket_shims::UnixSocketInterface>::UnixStream) -> IoResult<Self::ServiceClientConnection>
where <$unix_sock_impl as $crate::socket_shims::UnixSocketInterface>::UnixStream: 'async_trait
{
$crate::declare_service!(@wrap_implementation bare_stream $unix_stream_preprocess_method $($unix_stream_preprocess_spec)*)
}
}
$crate::declare_service!{@maybe_autostart_impl $(with_cli {$command $($args)*})? with_name $service_name <$unix_sock_impl> $(with_constraints {$($typeparam_constraints)*})?}
};
{@maybe_autostart_impl
with_cli {$command:literal $($args:literal)*}
with_name $service_name:ident <$unix_sock_impl:ty>
$(with_constraints {$($typeparam_constraints:tt)*})?
} => {
#[$crate::async_trait(?Send)]
impl $(<$($typeparam_constraints)*>)? $crate::ServiceStartable <$unix_sock_impl> for $service_name {
fn run_service_command_raw(
&self,
executor_commandline_prefix: ::core::option::Option<&[impl ::core::convert::AsRef<::std::ffi::OsStr> + ::std::fmt::Debug]>,
liveness_path: ::core::option::Option<&::std::path::Path>,
) -> ::std::io::Result<::std::process::Child> {
use ::std::{process::Command, iter::{Iterator, IntoIterator, once}, ffi::OsStr};
use $crate::chain_trans::prelude::*;
let mut all_components_iterator = executor_commandline_prefix
.map(|l| l.iter()).into_iter()
.flatten()
.map(::core::convert::AsRef::as_ref)
.chain(once(OsStr::new($command)))
.chain([$(OsStr::new($args)),*].into_iter());
let program = all_components_iterator.next().expect("There must be at least one thing in the iterator - the program to run, itself.");
Command::new(program)
.trans_mut(|cmd| { $crate::liveness::set_liveness_environment(cmd, liveness_path); })
.args(all_components_iterator)
.spawn()
}
}
};
{@maybe_autostart_impl
with_name $service_name:ident <$unix_sock_impl:ty>
$(with_constraints {$($typeparam_constraints:tt)*})?
} => {};
{@socket_connection_type raw |$unix_socket:ident| -> Io<$result:ty> $body:block } => { $result };
{@wrap_implementation $stream_ident:ident raw |$unix_socket:ident| -> Io<$result:ty> $body:block} => {{
let inner_closure = |$unix_socket| -> ::std::io::Result<$result> { $body };
async { inner_closure($stream_ident) }.await
}};
}
#[macro_export]
macro_rules! declare_service_bundle {
{
$(#[$bundle_meta:meta])*
$bundle_vis:vis $bundle_name:ident <$socket_bundle_impl:ident> {$(
$(#[$service_meta:meta])*
$service_vis:vis fn $service_fn_name:ident () -> $service_type_name:ident <$unix_sock_impl:ty> = { $($service_definition:tt)*}
$(impl { $($unix_sock_constraints:tt)* })?
);*}
} => {
$(#[$bundle_meta])*
$bundle_vis struct $bundle_name <$socket_bundle_impl: $crate::socket_shims::UnixSocketInterface>{
executor_prefix: ::core::option::Option::<::std::vec::Vec::<::std::ffi::OsString>>,
base_context_path: ::std::path::PathBuf,
_socket_iface: ::core::marker::PhantomData<$socket_bundle_impl>
}
impl <$socket_bundle_impl: $crate::socket_shims::UnixSocketInterface> $crate::ServiceBundle for $bundle_name<$socket_bundle_impl>{
fn new(base_context_path: &::std::path::Path) -> Self {
use ::std::borrow::ToOwned;
Self {
base_context_path: base_context_path.to_owned(),
executor_prefix: ::core::option::Option::None,
_socket_iface: ::core::marker::PhantomData
}
}
fn with_executor_prefix(base_context_path: &::std::path::Path, executor_prefix: &[::std::ffi::OsString]) -> Self {
use ::std::borrow::ToOwned;
Self {
base_context_path: base_context_path.to_owned(),
executor_prefix: ::core::option::Option::Some(executor_prefix.to_owned()),
_socket_iface: ::core::marker::PhantomData
}
}
}
$(
$crate::declare_service!{
$(#[$service_meta])*
$service_vis $service_type_name <$unix_sock_impl> = { $($service_definition)* } $(impl {$($unix_sock_constraints)*})?
}
)*
impl <$socket_bundle_impl: $crate::socket_shims::UnixSocketInterface> $bundle_name<$socket_bundle_impl> {$(
$service_vis fn $service_fn_name(&self) -> $crate::ReifiedService<'_, $service_type_name, $socket_bundle_impl>
where $service_type_name: $crate::Service::<$socket_bundle_impl>
{
match &self.executor_prefix {
Some(ep) => $crate::ReifiedService::reify_service_with_executor($service_type_name, &self.base_context_path, ep.as_slice()),
None => $crate::ReifiedService::reify_service($service_type_name, &self.base_context_path)
}
}
)*}
}
}
pub mod prelude {
pub use super::{
declare_service, declare_service_bundle, ReifiedService, ServiceBundle, ServiceExt,
UnixSocketInterface,
};
pub use futures_lite::future::block_on as futures_lite_block_on;
}
#[cfg(test)]
mod tests {
use std::env::temp_dir;
use futures_lite::future::block_on;
use crate::socket_shims::StdThreadpoolUSocks;
use super::*;
#[test]
pub fn service_declaration_and_start_fail_test() {
let tmpdir = temp_dir();
declare_service! {
pub TestService <U> = {
"sfdjfkosdgjsadgjlas" @ "test-service.sock" as raw |unix_socket| -> Io<U::UnixStream> {
Ok(unix_socket)
}
} impl {U: UnixSocketInterface}
}
assert!(block_on(
ServiceExt::<StdThreadpoolUSocks>::reify(TestService, &tmpdir)
.connect(Duration::from_millis(50))
)
.is_err());
declare_service! {
pub TestService2 <U> = {@"test-service-2.sock" as raw |unix_socket| -> Io<U::UnixStream> {
Ok(unix_socket)
}} impl {U: UnixSocketInterface}
}
}
#[test]
pub fn service_bundle_macro_test() {
declare_service_bundle! {
pub TestBundle <B> {
pub fn echo_service() -> EchoService <StdThreadpoolUSocks> = {
"echo-executable-wekdasjkfgnjsd" "--and" "--some" "--args" @ "echo-service.sock" as raw |unix_socket| ->
Io< <StdThreadpoolUSocks as UnixSocketInterface>::UnixStream> { Ok(unix_socket) }
};
pub fn hello_service() -> HelloService<U> = {
"hello-executable-fjskldgkjsagd" "a" @ "hello-service.sock" as raw |unix_socket| -> Io<U::UnixStream> { Ok(unix_socket) }
} impl {U: UnixSocketInterface}
}
}
let tmpdir = temp_dir();
let wonderful_bundle = TestBundle::<StdThreadpoolUSocks>::new(&tmpdir);
assert!(block_on(
wonderful_bundle
.echo_service()
.connect(Duration::from_millis(50))
)
.is_err());
assert!(block_on(
wonderful_bundle
.hello_service()
.connect(Duration::from_millis(50))
)
.is_err())
}
}