mod command;
mod stop_at;
mod tasks;
mod util;
use std::{
fmt::{self, Display, Formatter},
fs, io,
path::{Path, PathBuf},
};
use datasize::DataSize;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{net::UnixListener, sync::watch};
use tracing::{debug, error, info, warn};
use crate::{
components::{Component, ComponentState, InitializedComponent, PortBoundComponent},
effect::{
announcements::ControlAnnouncement,
diagnostics_port::DumpConsensusStateRequest,
requests::{NetworkInfoRequest, SetNodeStopRequest},
EffectBuilder, EffectExt, Effects,
},
reactor::main_reactor::MainEvent,
types::NodeRng,
utils::umask,
WithDir,
};
pub(crate) use stop_at::StopAtSpec;
pub use tasks::FileSerializer;
use util::ShowUnixAddr;
const COMPONENT_NAME: &str = "diagnostics_port";
#[derive(Clone, DataSize, Debug, Serialize, Deserialize)]
pub struct Config {
pub enabled: bool,
pub socket_path: PathBuf,
pub socket_umask: u16,
}
impl Default for Config {
fn default() -> Self {
Self {
enabled: false,
socket_path: "debug.socket".into(),
socket_umask: 0o077,
}
}
}
#[derive(Debug, DataSize)]
pub(crate) struct DiagnosticsPort {
state: ComponentState,
#[data_size(skip)]
_shutdown_sender: Option<watch::Sender<()>>, config: WithDir<Config>,
}
impl DiagnosticsPort {
pub(crate) fn new(config: WithDir<Config>) -> Self {
DiagnosticsPort {
state: ComponentState::Uninitialized,
config,
_shutdown_sender: None,
}
}
}
#[derive(Debug, Serialize)]
pub(crate) enum Event {
Initialize,
}
impl Display for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("diagnostics port event")
}
}
#[derive(Debug, Error)]
pub(crate) enum Error {
#[error("could not setup diagnostics port listener")]
SetupListener(#[from] io::Error),
}
impl<REv> Component<REv> for DiagnosticsPort
where
REv: From<Event>
+ From<DumpConsensusStateRequest>
+ From<ControlAnnouncement>
+ From<NetworkInfoRequest>
+ From<SetNodeStopRequest>
+ Send,
{
type Event = Event;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Event,
) -> Effects<Event> {
match &self.state {
ComponentState::Fatal(msg) => {
error!(
msg,
?event,
name = <Self as Component<MainEvent>>::name(self),
"should not handle this event when this component has fatal error"
);
Effects::new()
}
ComponentState::Uninitialized => {
warn!(
?event,
name = <Self as Component<MainEvent>>::name(self),
"should not handle this event when component is uninitialized"
);
Effects::new()
}
ComponentState::Initializing => match event {
Event::Initialize => {
if self.state != ComponentState::Initializing {
return Effects::new();
}
let (effects, state) = self.bind(self.config.value().enabled, effect_builder);
<Self as InitializedComponent<MainEvent>>::set_state(self, state);
effects
}
},
ComponentState::Initialized => Effects::new(),
}
}
fn name(&self) -> &str {
COMPONENT_NAME
}
}
impl<REv> InitializedComponent<REv> for DiagnosticsPort
where
REv: From<Event>
+ From<DumpConsensusStateRequest>
+ From<ControlAnnouncement>
+ From<NetworkInfoRequest>
+ From<SetNodeStopRequest>
+ Send,
{
fn state(&self) -> &ComponentState {
&self.state
}
fn set_state(&mut self, new_state: ComponentState) {
info!(
?new_state,
name = <Self as Component<MainEvent>>::name(self),
"component state changed"
);
self.state = new_state;
}
}
impl<REv> PortBoundComponent<REv> for DiagnosticsPort
where
REv: From<Event>
+ From<DumpConsensusStateRequest>
+ From<ControlAnnouncement>
+ From<NetworkInfoRequest>
+ From<SetNodeStopRequest>
+ Send,
{
type Error = Error;
type ComponentEvent = Event;
fn listen(
&mut self,
effect_builder: EffectBuilder<REv>,
) -> Result<Effects<Event>, Self::Error> {
let (shutdown_sender, shutdown_receiver) = watch::channel(());
self._shutdown_sender = Some(shutdown_sender);
let cfg = self.config.value();
let socket_path = self.config.with_dir(cfg.socket_path.clone());
let listener = setup_listener(
&socket_path,
#[allow(clippy::useless_conversion)]
cfg.socket_umask.into(),
)?;
let server = tasks::server(effect_builder, socket_path, listener, shutdown_receiver);
Ok(server.ignore())
}
}
fn setup_listener<P: AsRef<Path>>(path: P, socket_umask: umask::Mode) -> io::Result<UnixListener> {
let socket_path = path.as_ref();
if socket_path.exists() {
debug!(socket_path=%socket_path.display(), "found stale socket file, trying to remove");
match fs::remove_file(socket_path) {
Ok(_) => {
debug!("stale socket file removed");
}
Err(err) => {
warn!(%err, "could not remove stale socket file, assuming race with other process");
}
}
}
let umask_guard = umask::temp_umask(socket_umask);
let listener = UnixListener::bind(socket_path)?;
drop(umask_guard);
debug!(local_addr=%ShowUnixAddr(&listener.local_addr()?), "diagnostics port listening");
Ok(listener)
}
#[cfg(test)]
mod tests {
use std::{
fs,
os::unix::prelude::{FileTypeExt, PermissionsExt},
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
};
use super::setup_listener;
#[tokio::test]
async fn setup_listener_creates_listener() {
const TEST_MESSAGE: &[u8] = b"hello, world!";
let tmpdir = tempfile::tempdir().expect("could not create tempdir");
let socket_path = tmpdir.path().join("test.socket");
let listener = setup_listener(&socket_path, 0o077).expect("could not setup listener");
let meta = fs::metadata(&socket_path).expect("could not get metadata");
assert_eq!(meta.permissions().mode() & 0o077, 0);
tokio::spawn(async move {
let mut stream = UnixStream::connect(socket_path)
.await
.expect("could not connect to listener");
stream
.write_all(TEST_MESSAGE)
.await
.expect("could not write to listener");
});
let (mut stream, _socket_addr) = listener
.accept()
.await
.expect("could not accept connection");
let mut buffer = Vec::new();
stream
.read_to_end(&mut buffer)
.await
.expect("failed to read to end");
assert_eq!(TEST_MESSAGE, buffer.as_slice());
}
#[tokio::test]
async fn setup_listener_removes_previous_listener() {
let tmpdir = tempfile::tempdir().expect("could not create tempdir");
let socket_path = tmpdir.path().join("overwrite-me.socket");
fs::write(&socket_path, b"this-file-should-be-deleted-soon")
.expect("could not write to socket-blocking temporary file");
let meta = fs::metadata(&socket_path).expect("could not get metadata");
assert!(
!meta.file_type().is_socket(),
"temporary file created should not be a socket"
);
let _listener = setup_listener(&socket_path, 0o022).expect("could not setup listener");
let meta = fs::metadata(&socket_path).expect("could not get metadata");
assert!(
meta.file_type().is_socket(),
"did not overwrite previous file"
);
}
}