use std::{fmt::Debug, sync::Arc, time::SystemTime};
use libc::pid_t;
use mio::Token;
use prost::Message;
use rusty_ulid::Ulid;
use sozu_command_lib::{
channel::Channel,
proto::command::{
Request, Response, ResponseContent, ResponseStatus, RunState, WorkerInfo, WorkerRequest,
WorkerResponse,
},
ready::Ready,
scm_socket::ScmSocket,
};
use crate::command::server::{ClientId, MessageClient, PeerCred, WorkerId};
#[derive(Debug)]
pub struct ClientSession {
pub channel: Channel<Response, Request>,
pub id: ClientId,
pub session_ulid: Ulid,
pub token: Token,
pub actor_uid: Option<u32>,
pub actor_gid: Option<u32>,
pub actor_pid: Option<i32>,
pub actor_comm: Option<String>,
pub actor_user: Option<String>,
pub socket_path: Arc<str>,
pub connect_ts: SystemTime,
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum ClientResult {
NothingToDo,
NewRequest(Request),
CloseSession,
}
impl ClientSession {
pub fn new(
mut channel: Channel<Response, Request>,
id: ClientId,
token: Token,
peer_cred: PeerCred,
actor_comm: Option<String>,
actor_user: Option<String>,
socket_path: Arc<str>,
) -> Self {
channel.interest = Ready::READABLE | Ready::ERROR | Ready::HUP;
Self {
channel,
id,
session_ulid: Ulid::generate(),
token,
actor_uid: peer_cred.uid,
actor_gid: peer_cred.gid,
actor_pid: peer_cred.pid,
actor_comm,
actor_user,
socket_path,
connect_ts: SystemTime::now(),
}
}
pub fn actor_uid_display(&self) -> String {
display_or_unknown(self.actor_uid)
}
pub fn actor_gid_display(&self) -> String {
display_or_unknown(self.actor_gid)
}
pub fn actor_pid_display(&self) -> String {
display_or_unknown(self.actor_pid)
}
pub fn connect_ts_display(&self) -> String {
crate::command::requests::rfc3339_utc(self.connect_ts)
}
pub fn actor_comm_display(&self) -> String {
display_sanitized_or_unknown(self.actor_comm.as_deref())
}
pub fn actor_user_display(&self) -> String {
display_sanitized_or_unknown(self.actor_user.as_deref())
}
fn send(&mut self, response: Response) {
if let Err(e) = self.channel.write_message(&response) {
error!("error writing on channel: {}", e);
self.channel.readiness = Ready::ERROR;
return;
}
self.channel.interest.insert(Ready::WRITABLE);
}
pub fn update_readiness(&mut self, events: Ready) {
self.channel.handle_events(events);
}
pub fn ready(&mut self) -> ClientResult {
if self.channel.readiness.is_error() || self.channel.readiness.is_hup() {
return ClientResult::CloseSession;
}
let status = self.channel.writable();
trace!("client writable: {:?}", status);
let mut requests = extract_messages(&mut self.channel);
match requests.pop() {
Some(request) => {
if !requests.is_empty() {
error!("more than one request at a time");
}
ClientResult::NewRequest(request)
}
None => ClientResult::NothingToDo,
}
}
}
pub fn sanitize_for_audit(s: &str) -> String {
if s.chars().all(|c| !is_unsafe_line(c)) {
return s.to_owned();
}
s.chars()
.map(|c| if is_unsafe_line(c) { '?' } else { c })
.collect()
}
pub fn sanitize_for_audit_kv(s: &str) -> String {
if s.chars().all(|c| !is_unsafe_kv(c)) {
return s.to_owned();
}
s.chars()
.map(|c| if is_unsafe_kv(c) { '?' } else { c })
.collect()
}
#[inline]
fn is_unsafe_line(c: char) -> bool {
c.is_control()
|| c == '\u{feff}'
|| c == '\u{2028}'
|| c == '\u{2029}'
|| matches!(c, '\u{202A}'..='\u{202E}' | '\u{2066}'..='\u{2069}')
}
#[inline]
fn is_unsafe_kv(c: char) -> bool {
is_unsafe_line(c) || c == ',' || c == '='
}
pub fn display_or_unknown<T: ToString>(value: Option<T>) -> String {
match value {
Some(v) => v.to_string(),
None => String::from("unknown"),
}
}
pub fn display_sanitized_or_unknown(value: Option<&str>) -> String {
match value {
Some(s) => sanitize_for_audit(s),
None => String::from("unknown"),
}
}
impl MessageClient for ClientSession {
fn finish_ok<T: Into<String>>(&mut self, message: T) {
let message = message.into();
debug!("{}", message);
self.send(Response {
status: ResponseStatus::Ok.into(),
message,
content: None,
})
}
fn finish_ok_with_content<T: Into<String>>(&mut self, content: ResponseContent, message: T) {
let message = message.into();
debug!("{}", message);
self.send(Response {
status: ResponseStatus::Ok.into(),
message,
content: Some(content),
})
}
fn finish_failure<T: Into<String>>(&mut self, message: T) {
let message = message.into();
error!("{}", message);
self.send(Response {
status: ResponseStatus::Failure.into(),
message,
content: None,
})
}
fn return_processing<S: Into<String>>(&mut self, message: S) {
let message = message.into();
debug!("{}", message);
self.send(Response {
status: ResponseStatus::Processing.into(),
message,
content: None,
});
}
fn return_processing_with_content<S: Into<String>>(
&mut self,
message: S,
content: ResponseContent,
) {
let message = message.into();
debug!("{}", message);
self.send(Response {
status: ResponseStatus::Processing.into(),
message,
content: Some(content),
});
}
}
pub type OptionalClient<'a> = Option<&'a mut ClientSession>;
impl MessageClient for OptionalClient<'_> {
fn finish_ok<T: Into<String>>(&mut self, message: T) {
match self {
None => debug!("{}", message.into()),
Some(client) => client.finish_ok(message),
}
}
fn finish_ok_with_content<T: Into<String>>(&mut self, content: ResponseContent, message: T) {
match self {
None => debug!("{}", message.into()),
Some(client) => client.finish_ok_with_content(content, message),
}
}
fn finish_failure<T: Into<String>>(&mut self, message: T) {
match self {
None => error!("{}", message.into()),
Some(client) => client.finish_failure(message),
}
}
fn return_processing<T: Into<String>>(&mut self, message: T) {
match self {
None => debug!("{}", message.into()),
Some(client) => client.return_processing(message),
}
}
fn return_processing_with_content<S: Into<String>>(
&mut self,
message: S,
content: ResponseContent,
) {
match self {
None => debug!("{}", message.into()),
Some(client) => client.return_processing_with_content(message, content),
}
}
}
#[derive(Debug)]
pub struct WorkerSession {
pub channel: Channel<WorkerRequest, WorkerResponse>,
pub id: WorkerId,
pub pid: pid_t,
pub run_state: RunState,
pub scm_socket: ScmSocket,
pub token: Token,
}
#[derive(Debug)]
pub enum WorkerResult {
NothingToDo,
NewResponses(Vec<WorkerResponse>),
CloseSession,
}
impl WorkerSession {
pub fn new(
mut channel: Channel<WorkerRequest, WorkerResponse>,
id: WorkerId,
pid: pid_t,
token: Token,
scm_socket: ScmSocket,
) -> Self {
channel.interest = Ready::READABLE | Ready::ERROR | Ready::HUP;
Self {
channel,
id,
pid,
run_state: RunState::Running,
scm_socket,
token,
}
}
pub fn send(&mut self, request: &WorkerRequest) {
trace!("Sending to worker: {:?}", request);
if let Err(e) = self.channel.write_message(request) {
error!("Could not send request to worker: {}", e);
self.channel.readiness = Ready::ERROR;
return;
}
self.channel.interest.insert(Ready::WRITABLE);
}
pub fn update_readiness(&mut self, events: Ready) {
self.channel.handle_events(events);
}
pub fn ready(&mut self) -> WorkerResult {
let status = self.channel.writable();
trace!("Worker writable: {:?}", status);
let responses = extract_messages(&mut self.channel);
if !responses.is_empty() {
return WorkerResult::NewResponses(responses);
}
if self.channel.readiness.is_error() || self.channel.readiness.is_hup() {
debug!("worker {} is unresponsive, closing the session", self.id);
return WorkerResult::CloseSession;
}
WorkerResult::NothingToDo
}
pub fn querying_info(&self) -> WorkerInfo {
let run_state = match self.run_state {
RunState::Stopping => RunState::Stopping,
RunState::Stopped => RunState::Stopped,
RunState::Running | RunState::NotAnswering => RunState::NotAnswering,
};
WorkerInfo {
id: self.id,
pid: self.pid,
run_state: run_state as i32,
}
}
pub fn is_active(&self) -> bool {
self.run_state != RunState::Stopping && self.run_state != RunState::Stopped
}
}
pub fn extract_messages<Tx, Rx>(channel: &mut Channel<Tx, Rx>) -> Vec<Rx>
where
Tx: Debug + Default + Message,
Rx: Debug + Default + Message,
{
let mut messages = Vec::new();
loop {
let status = channel.readable();
trace!("Channel readable: {:?}", status);
let old_capacity = channel.front_buf.capacity();
let message = channel.read_message();
match message {
Ok(message) => messages.push(message),
Err(_) => {
if old_capacity == channel.front_buf.capacity() {
return messages;
}
}
}
}
}
pub fn wants_to_tick<Tx, Rx>(channel: &Channel<Tx, Rx>) -> bool {
(channel.readiness.is_writable() && channel.back_buf.available_data() > 0)
|| (channel.readiness.is_hup() || channel.readiness.is_error())
}
#[cfg(test)]
mod tests {
use super::{sanitize_for_audit, sanitize_for_audit_kv};
#[test]
fn kv_strips_column_comma() {
assert_eq!(sanitize_for_audit_kv("x,y"), "x?y");
}
#[test]
fn kv_strips_column_equals() {
assert_eq!(sanitize_for_audit_kv("x=y"), "x?y");
}
#[test]
fn kv_strips_c1_nel() {
assert_eq!(sanitize_for_audit_kv("x\u{0085}y"), "x?y");
}
#[test]
fn kv_strips_c1_csi() {
assert_eq!(sanitize_for_audit_kv("x\u{009B}y"), "x?y");
}
#[test]
fn kv_strips_bom() {
assert_eq!(sanitize_for_audit_kv("x\u{FEFF}y"), "x?y");
}
#[test]
fn kv_strips_line_separator() {
assert_eq!(sanitize_for_audit_kv("x\u{2028}y"), "x?y");
}
#[test]
fn kv_strips_paragraph_separator() {
assert_eq!(sanitize_for_audit_kv("x\u{2029}y"), "x?y");
}
#[test]
fn kv_preserves_safe_ascii() {
assert_eq!(sanitize_for_audit_kv("safe-id_42"), "safe-id_42");
}
#[test]
fn kv_preserves_in_value_colon() {
assert_eq!(sanitize_for_audit_kv("host:8080"), "host:8080");
}
#[test]
fn line_keeps_comma() {
assert_eq!(sanitize_for_audit("x,y"), "x,y");
}
#[test]
fn line_keeps_equals() {
assert_eq!(sanitize_for_audit("x=y"), "x=y");
}
#[test]
fn line_strips_c1_nel() {
assert_eq!(sanitize_for_audit("x\u{0085}y"), "x?y");
}
#[test]
fn line_strips_c1_csi() {
assert_eq!(sanitize_for_audit("x\u{009B}y"), "x?y");
}
#[test]
fn line_strips_bom() {
assert_eq!(sanitize_for_audit("x\u{FEFF}y"), "x?y");
}
#[test]
fn line_strips_line_separator() {
assert_eq!(sanitize_for_audit("x\u{2028}y"), "x?y");
}
#[test]
fn line_strips_paragraph_separator() {
assert_eq!(sanitize_for_audit("x\u{2029}y"), "x?y");
}
#[test]
fn line_strips_c0_control() {
assert_eq!(sanitize_for_audit("x\ty\nz\0"), "x?y?z?");
}
#[test]
fn line_strips_del() {
assert_eq!(sanitize_for_audit("x\u{007F}y"), "x?y");
}
#[test]
fn line_strips_rtl_override() {
assert_eq!(sanitize_for_audit("a\u{202E}b"), "a?b");
assert_eq!(sanitize_for_audit_kv("a\u{202E}b"), "a?b");
}
#[test]
fn line_strips_bidi_override_range() {
for c in ['\u{202A}', '\u{202B}', '\u{202C}', '\u{202D}', '\u{202E}'] {
let input = format!("a{c}b");
assert_eq!(sanitize_for_audit(&input), "a?b");
assert_eq!(sanitize_for_audit_kv(&input), "a?b");
}
}
#[test]
fn line_strips_bidi_isolate_range() {
for c in ['\u{2066}', '\u{2067}', '\u{2068}', '\u{2069}'] {
let input = format!("a{c}b");
assert_eq!(sanitize_for_audit(&input), "a?b");
assert_eq!(sanitize_for_audit_kv(&input), "a?b");
}
}
#[test]
fn line_preserves_legitimate_bidi_text() {
let input = "héllo שלום مرحبا";
assert_eq!(sanitize_for_audit(input), input);
assert_eq!(sanitize_for_audit_kv(input), input);
}
}