use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use crossbeam_channel::{Receiver, Sender, after, bounded, select};
use sozu_command_lib::{
channel::Channel,
config::Config,
proto::command::{
MetricDetail, Request, Response, ResponseStatus, SetMetricDetail, request::RequestType,
},
};
use crate::cli::TopDetail;
use crate::ctl::create_channel;
use super::CtlError;
pub type StatusSlot = Arc<Mutex<Option<String>>>;
pub fn new_status_slot() -> StatusSlot {
Arc::new(Mutex::new(None))
}
pub fn take_status(slot: &StatusSlot) -> Option<String> {
match slot.lock() {
Ok(mut g) => g.take(),
Err(poison) => poison.into_inner().take(),
}
}
pub(super) fn publish_status(slot: &StatusSlot, msg: String) {
let mut g = match slot.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
*g = Some(msg);
}
enum DetailRequest {
Apply { reply: Sender<Result<(), CtlError>> },
Renew,
Clear,
Stop,
}
pub struct DetailGuard {
tx: Sender<DetailRequest>,
owner_handle: Option<JoinHandle<()>>,
renewer_stop: Arc<AtomicBool>,
renewer_wake_tx: Option<Sender<()>>,
renewer_handle: Option<JoinHandle<()>>,
#[allow(dead_code)]
client_id: String,
#[allow(dead_code)]
status: StatusSlot,
}
impl DetailGuard {
pub fn apply(
config: &Config,
detail: TopDetail,
ttl_seconds: u32,
reason: impl Into<String>,
status: StatusSlot,
) -> Result<Self, CtlError> {
let client_id = format!("top:{}:{}", process::id(), short_random_suffix());
let proto_detail = match detail {
TopDetail::Process => MetricDetail::DetailProcess,
TopDetail::Frontend => MetricDetail::DetailFrontend,
TopDetail::Cluster => MetricDetail::DetailCluster,
TopDetail::Backend => MetricDetail::DetailBackend,
};
let reason = reason.into();
let channel = create_channel(config)?;
let (tx, rx) = crossbeam_channel::unbounded::<DetailRequest>();
let owner_handle = spawn_owner(
channel,
rx,
client_id.clone(),
proto_detail,
ttl_seconds,
reason.clone(),
Arc::clone(&status),
);
let (apply_reply_tx, apply_reply_rx) = bounded::<Result<(), CtlError>>(1);
if tx
.send(DetailRequest::Apply {
reply: apply_reply_tx,
})
.is_err()
{
let _ = owner_handle.join();
return Err(CtlError::WriteRequest(
sozu_command_lib::channel::ChannelError::Connection(None),
));
}
let apply_result = match apply_reply_rx.recv() {
Ok(r) => r,
Err(_) => {
let _ = owner_handle.join();
return Err(CtlError::WriteRequest(
sozu_command_lib::channel::ChannelError::Connection(None),
));
}
};
if let Err(e) = apply_result {
let _ = owner_handle.join();
return Err(e);
}
let renewer_stop = Arc::new(AtomicBool::new(false));
let (renewer_wake_tx, renewer_wake_rx) = bounded::<()>(0);
let renewer_handle = spawn_renewer(
tx.clone(),
ttl_seconds,
Arc::clone(&renewer_stop),
renewer_wake_rx,
);
Ok(Self {
tx,
owner_handle: Some(owner_handle),
renewer_stop,
renewer_wake_tx: Some(renewer_wake_tx),
renewer_handle: Some(renewer_handle),
client_id,
status,
})
}
}
impl Drop for DetailGuard {
fn drop(&mut self) {
self.renewer_stop.store(true, Ordering::Relaxed);
drop(self.renewer_wake_tx.take());
if let Some(handle) = self.renewer_handle.take() {
let _ = handle.join();
}
let _ = self.tx.send(DetailRequest::Clear);
let _ = self.tx.send(DetailRequest::Stop);
if let Some(handle) = self.owner_handle.take() {
let _ = handle.join();
}
}
}
fn spawn_owner(
mut channel: Channel<Request, Response>,
rx: Receiver<DetailRequest>,
client_id: String,
detail: MetricDetail,
ttl_seconds: u32,
reason: String,
status: StatusSlot,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("sozu-top-detail-owner".into())
.spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
DetailRequest::Apply { reply } => {
let result = send_set_detail(
&mut channel,
&client_id,
Some(detail),
Some(ttl_seconds),
Some(&reason),
false,
);
let failed = result.is_err();
let _ = reply.send(result);
if failed {
return;
}
}
DetailRequest::Renew => {
if let Err(e) = send_set_detail(
&mut channel,
&client_id,
Some(detail),
Some(ttl_seconds),
Some(&format!("{reason} (renew)")),
false,
) {
publish_status(
&status,
format!(
"renewer dropped: {e}; cardinality lapses in ≤ {ttl_seconds}s"
),
);
return;
}
}
DetailRequest::Clear => {
let _ = send_set_detail(
&mut channel,
&client_id,
None,
None,
Some(&format!("{reason} (clear)")),
true,
);
}
DetailRequest::Stop => return,
}
}
})
.expect("spawn sozu-top owner")
}
fn spawn_renewer(
tx: Sender<DetailRequest>,
ttl_seconds: u32,
stop: Arc<AtomicBool>,
wake_rx: Receiver<()>,
) -> JoinHandle<()> {
let renew_after = Duration::from_secs((ttl_seconds.max(2) / 2) as u64);
std::thread::Builder::new()
.name("sozu-top-detail-renewer".into())
.spawn(move || {
loop {
let timer = after(renew_after);
select! {
recv(timer) -> _ => {
if stop.load(Ordering::Relaxed) {
return;
}
if tx.send(DetailRequest::Renew).is_err() {
return;
}
}
recv(wake_rx) -> _ => {
return;
}
}
}
})
.expect("spawn sozu-top renewer")
}
fn send_set_detail(
channel: &mut Channel<Request, Response>,
client_id: &str,
detail: Option<MetricDetail>,
ttl_seconds: Option<u32>,
reason: Option<&str>,
clear: bool,
) -> Result<(), CtlError> {
let req = Request {
request_type: Some(RequestType::SetMetricDetail(SetMetricDetail {
client_id: client_id.to_owned(),
detail: detail.map(|d| d as i32),
ttl_seconds,
clear: Some(clear),
reason: reason.map(|r| r.to_owned()),
peer_pid: None,
peer_session_ulid: None,
})),
};
channel
.write_message(&req)
.map_err(CtlError::WriteRequest)?;
loop {
let resp = channel
.read_message_blocking_timeout(Some(Duration::from_secs(5)))
.map_err(CtlError::ReadBlocking)?;
match resp.status() {
ResponseStatus::Processing => continue,
ResponseStatus::Failure => return Err(CtlError::WrongResponse(resp)),
ResponseStatus::Ok => return Ok(()),
}
}
}
fn short_random_suffix() -> String {
let mut buf = [0u8; 4];
if read_csprng_bytes(&mut buf) {
let n = u32::from_le_bytes(buf);
return format!("{n:08x}");
}
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
format!("{nanos:08x}")
}
fn read_csprng_bytes(buf: &mut [u8]) -> bool {
#[cfg(target_os = "linux")]
{
let ret = unsafe {
libc::getrandom(
buf.as_mut_ptr().cast::<libc::c_void>(),
buf.len(),
libc::GRND_NONBLOCK,
)
};
if ret as usize == buf.len() {
return true;
}
}
use std::io::Read;
if let Ok(mut f) = std::fs::File::open("/dev/urandom")
&& f.read_exact(buf).is_ok()
{
return true;
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn renewer_sends_renew_after_ttl_half() {
let (tx, rx) = crossbeam_channel::unbounded::<DetailRequest>();
let stop = Arc::new(AtomicBool::new(false));
let (wake_tx, wake_rx) = bounded::<()>(0);
let start = Instant::now();
let handle = spawn_renewer(tx, 2, Arc::clone(&stop), wake_rx);
let msg = rx
.recv_timeout(Duration::from_secs(3))
.expect("renewer produced no Renew within 3 s");
assert!(
matches!(msg, DetailRequest::Renew),
"first mailbox message must be Renew"
);
assert!(
start.elapsed() >= Duration::from_millis(900),
"renewer fired too early: {:?}",
start.elapsed()
);
stop.store(true, Ordering::Relaxed);
drop(wake_tx);
handle.join().expect("renewer panicked");
}
#[test]
fn renewer_wakes_on_drop() {
let (tx, _rx) = crossbeam_channel::unbounded::<DetailRequest>();
let stop = Arc::new(AtomicBool::new(false));
let (wake_tx, wake_rx) = bounded::<()>(0);
let handle = spawn_renewer(tx, 60, Arc::clone(&stop), wake_rx);
std::thread::sleep(Duration::from_millis(50));
stop.store(true, Ordering::Relaxed);
drop(wake_tx);
let start = Instant::now();
handle.join().expect("renewer panicked");
assert!(
start.elapsed() < Duration::from_secs(2),
"renewer did not wake on drop: {:?}",
start.elapsed()
);
}
}