use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use crossbeam_channel::{Receiver, Sender, TrySendError, bounded};
use sozu_command_lib::{
channel::ChannelError,
config::Config,
proto::command::{
AggregatedMetrics, Event, ListListeners, ListOfCertificatesByAddress, ListenersList,
QueryCertificatesFilters, QueryMetricsOptions, Request, Response, ResponseStatus,
SubscribeEvents, request::RequestType, response_content::ContentType,
},
};
use crate::ctl::create_channel;
use super::CtlError;
use super::cardinality::{StatusSlot, publish_status};
#[derive(Debug, Clone)]
pub struct Snapshot {
pub metrics: AggregatedMetrics,
pub received_at: Instant,
}
#[derive(Debug, Clone)]
pub struct TopEvent {
pub event: Event,
pub received_at: Instant,
}
#[derive(Debug, Clone)]
pub struct ListenersSnapshot {
pub list: ListenersList,
}
#[derive(Debug, Clone)]
pub struct CertsSnapshot {
pub list: ListOfCertificatesByAddress,
}
const EVENTS_CAP: usize = 64;
const EVENTS_READ_TIMEOUT: Duration = Duration::from_secs(1);
const SNAPSHOT_CAP: usize = 1;
fn poll_loop<T, F>(
label: &'static str,
interval: Duration,
tx: Sender<T>,
status: StatusSlot,
mut channel: sozu_command_lib::channel::Channel<Request, Response>,
mut poll: F,
) where
F: FnMut(&mut sozu_command_lib::channel::Channel<Request, Response>) -> Result<T, String>,
{
loop {
let started = Instant::now();
match poll(&mut channel) {
Ok(v) => match tx.try_send(v) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {}
Err(TrySendError::Disconnected(_)) => return,
},
Err(err) => {
publish_status(&status, format!("{label} poll error: {err}"));
}
}
let elapsed = started.elapsed();
if elapsed < interval {
std::thread::sleep(interval - elapsed);
}
}
}
pub fn spawn_collector(
config: Config,
refresh_ms: u64,
status: StatusSlot,
) -> Result<(Receiver<Snapshot>, std::thread::JoinHandle<()>), CtlError> {
let channel = create_channel(&config)?;
let (tx, rx) = bounded::<Snapshot>(SNAPSHOT_CAP);
let interval = Duration::from_millis(refresh_ms);
let handle = std::thread::Builder::new()
.name("sozu-top-collector".into())
.spawn(move || {
poll_loop("snapshot", interval, tx, status, channel, |ch| {
poll_metrics(ch).map(|metrics| Snapshot {
metrics,
received_at: Instant::now(),
})
})
})
.map_err(|source| CtlError::SpawnFailed {
label: "sozu-top-collector",
source,
})?;
Ok((rx, handle))
}
fn poll_metrics(
channel: &mut sozu_command_lib::channel::Channel<Request, Response>,
) -> Result<AggregatedMetrics, String> {
let req = Request {
request_type: Some(RequestType::QueryMetrics(QueryMetricsOptions {
list: false,
cluster_ids: vec![],
backend_ids: vec![],
metric_names: vec![],
no_clusters: false,
workers: false,
})),
};
channel
.write_message(&req)
.map_err(|e| format!("write QueryMetrics: {e}"))?;
loop {
let resp = channel
.read_message_blocking_timeout(Some(Duration::from_secs(5)))
.map_err(|e| format!("read QueryMetrics response: {e}"))?;
match resp.status() {
ResponseStatus::Processing => continue,
ResponseStatus::Failure => {
return Err(format!("QueryMetrics failed: {}", resp.message));
}
ResponseStatus::Ok => match resp.content {
Some(content) => match content.content_type {
Some(ContentType::Metrics(m)) => return Ok(m),
other => {
return Err(format!(
"unexpected content variant for QueryMetrics: {}",
content_type_name(other.as_ref()),
));
}
},
None => return Err("QueryMetrics OK with no content".into()),
},
}
}
}
const LISTENERS_INTERVAL: Duration = Duration::from_secs(5);
const CERTS_INTERVAL: Duration = Duration::from_secs(30);
pub fn spawn_listeners(
config: Config,
status: StatusSlot,
) -> Result<(Receiver<ListenersSnapshot>, std::thread::JoinHandle<()>), CtlError> {
let channel = create_channel(&config)?;
let (tx, rx) = bounded::<ListenersSnapshot>(SNAPSHOT_CAP);
let handle = std::thread::Builder::new()
.name("sozu-top-listeners".into())
.spawn(move || {
poll_loop("listeners", LISTENERS_INTERVAL, tx, status, channel, |ch| {
poll_listeners(ch).map(|list| ListenersSnapshot { list })
})
})
.map_err(|source| CtlError::SpawnFailed {
label: "sozu-top-listeners",
source,
})?;
Ok((rx, handle))
}
fn poll_listeners(
channel: &mut sozu_command_lib::channel::Channel<Request, Response>,
) -> Result<ListenersList, String> {
let req = Request {
request_type: Some(RequestType::ListListeners(ListListeners {})),
};
channel
.write_message(&req)
.map_err(|e| format!("write ListListeners: {e}"))?;
loop {
let resp = channel
.read_message_blocking_timeout(Some(Duration::from_secs(5)))
.map_err(|e| format!("read ListListeners response: {e}"))?;
match resp.status() {
ResponseStatus::Processing => continue,
ResponseStatus::Failure => {
return Err(format!("ListListeners failed: {}", resp.message));
}
ResponseStatus::Ok => match resp.content {
Some(content) => match content.content_type {
Some(ContentType::ListenersList(l)) => return Ok(l),
other => {
return Err(format!(
"unexpected content variant for ListListeners: {}",
content_type_name(other.as_ref()),
));
}
},
None => return Err("ListListeners OK with no content".into()),
},
}
}
}
pub fn spawn_certs(
config: Config,
status: StatusSlot,
) -> Result<(Receiver<CertsSnapshot>, std::thread::JoinHandle<()>), CtlError> {
let channel = create_channel(&config)?;
let (tx, rx) = bounded::<CertsSnapshot>(SNAPSHOT_CAP);
let handle = std::thread::Builder::new()
.name("sozu-top-certs".into())
.spawn(move || {
poll_loop("certs", CERTS_INTERVAL, tx, status, channel, |ch| {
poll_certs(ch).map(|list| CertsSnapshot { list })
})
})
.map_err(|source| CtlError::SpawnFailed {
label: "sozu-top-certs",
source,
})?;
Ok((rx, handle))
}
fn poll_certs(
channel: &mut sozu_command_lib::channel::Channel<Request, Response>,
) -> Result<ListOfCertificatesByAddress, String> {
let req = Request {
request_type: Some(RequestType::QueryCertificatesFromTheState(
QueryCertificatesFilters {
domain: None,
fingerprint: None,
},
)),
};
channel
.write_message(&req)
.map_err(|e| format!("write QueryCertificatesFromTheState: {e}"))?;
loop {
let resp = channel
.read_message_blocking_timeout(Some(Duration::from_secs(5)))
.map_err(|e| format!("read QueryCertificatesFromTheState response: {e}"))?;
match resp.status() {
ResponseStatus::Processing => continue,
ResponseStatus::Failure => {
return Err(format!(
"QueryCertificatesFromTheState failed: {}",
resp.message
));
}
ResponseStatus::Ok => match resp.content {
Some(content) => match content.content_type {
Some(ContentType::CertificatesByAddress(l)) => return Ok(l),
Some(ContentType::CertificatesWithFingerprints(map)) => {
return Ok(certs_from_fingerprint_map(map));
}
other => {
return Err(format!(
"unexpected content variant for QueryCertificatesFromTheState: {}",
content_type_name(other.as_ref()),
));
}
},
None => return Err("QueryCertificatesFromTheState OK with no content".into()),
},
}
}
}
fn certs_from_fingerprint_map(
payload: sozu_command_lib::proto::command::CertificatesWithFingerprints,
) -> ListOfCertificatesByAddress {
use sozu_command_lib::proto::command::{
CertificateSummary, CertificatesByAddress, SocketAddress,
};
let mut summaries: Vec<CertificateSummary> = Vec::with_capacity(payload.certs.len());
for (fingerprint, cert) in payload.certs {
let domain = cert
.names
.into_iter()
.next()
.unwrap_or_else(|| "<unknown>".to_owned());
summaries.push(CertificateSummary {
fingerprint,
domain,
});
}
ListOfCertificatesByAddress {
certificates: vec![CertificatesByAddress {
address: SocketAddress {
ip: sozu_command_lib::proto::command::IpAddress {
inner: Some(sozu_command_lib::proto::command::ip_address::Inner::V4(0)),
},
port: 0,
},
certificate_summaries: summaries,
}],
}
}
fn content_type_name(ct: Option<&ContentType>) -> &'static str {
match ct {
None => "<none>",
Some(ContentType::Workers(_)) => "Workers",
Some(ContentType::Metrics(_)) => "Metrics",
Some(ContentType::WorkerResponses(_)) => "WorkerResponses",
Some(ContentType::Event(_)) => "Event",
Some(ContentType::FrontendList(_)) => "FrontendList",
Some(ContentType::ListenersList(_)) => "ListenersList",
Some(ContentType::WorkerMetrics(_)) => "WorkerMetrics",
Some(ContentType::AvailableMetrics(_)) => "AvailableMetrics",
Some(ContentType::Clusters(_)) => "Clusters",
Some(ContentType::ClusterHashes(_)) => "ClusterHashes",
Some(ContentType::CertificatesByAddress(_)) => "CertificatesByAddress",
Some(ContentType::CertificatesWithFingerprints(_)) => "CertificatesWithFingerprints",
Some(ContentType::RequestCounts(_)) => "RequestCounts",
Some(ContentType::MaxConnectionsPerIpLimit(_)) => "MaxConnectionsPerIpLimit",
Some(ContentType::HealthChecksList(_)) => "HealthChecksList",
Some(ContentType::MetricDetailStatus(_)) => "MetricDetailStatus",
Some(ContentType::WorkerMetricDetailStatus(_)) => "WorkerMetricDetailStatus",
}
}
pub fn spawn_events(
config: Config,
shutdown: Arc<AtomicBool>,
status: StatusSlot,
) -> Result<(Receiver<TopEvent>, std::thread::JoinHandle<()>), CtlError> {
let mut channel = create_channel(&config)?;
let (tx, rx) = bounded::<TopEvent>(EVENTS_CAP);
let handle = std::thread::Builder::new()
.name("sozu-top-events".into())
.spawn(move || events_loop(&mut channel, tx, shutdown, status))
.map_err(|source| CtlError::SpawnFailed {
label: "sozu-top-events",
source,
})?;
Ok((rx, handle))
}
fn events_loop(
channel: &mut sozu_command_lib::channel::Channel<Request, Response>,
tx: Sender<TopEvent>,
shutdown: Arc<AtomicBool>,
status: StatusSlot,
) {
let req = Request {
request_type: Some(RequestType::SubscribeEvents(SubscribeEvents {})),
};
if let Err(e) = channel.write_message(&req) {
publish_status(&status, format!("events: SubscribeEvents write error: {e}"));
return;
}
while !shutdown.load(Ordering::Relaxed) {
let resp = match channel.read_message_blocking_timeout(Some(EVENTS_READ_TIMEOUT)) {
Ok(r) => r,
Err(ChannelError::TimeoutReached(_)) => continue,
Err(e) => {
publish_status(&status, format!("events: read error: {e}"));
return;
}
};
match resp.status() {
ResponseStatus::Processing => {
if let Some(ev) = unwrap_event(resp) {
let topev = TopEvent {
event: ev,
received_at: Instant::now(),
};
let _ = tx.try_send(topev);
}
}
ResponseStatus::Ok | ResponseStatus::Failure => return,
}
}
}
fn unwrap_event(resp: Response) -> Option<Event> {
match resp.content?.content_type? {
ContentType::Event(ev) => Some(ev),
_ => None,
}
}