use crate::error::{Error, Result};
use crate::wire::{read_frame, write_frame};
use falcorn_proto::logging::{
PLUGIN_FILTER_ALL, PLUGIN_FRAME_MAGIC, PLUGIN_PROTOCOL_VERSION, PluginErrorFrame, PluginEvent,
PluginFrameType, PluginPingFrame, PluginPongFrame, PluginSubscribeAck, PluginSubscribeRequest,
};
use std::os::unix::net::UnixStream;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct LoggingClientBuilder {
socket: String,
auth_token: Option<String>,
client_name: Option<String>,
filter_mask: u8,
read_timeout: Option<Duration>,
write_timeout: Option<Duration>,
}
impl LoggingClientBuilder {
pub fn new(socket: impl Into<String>) -> Self {
Self {
socket: socket.into(),
auth_token: None,
client_name: None,
filter_mask: PLUGIN_FILTER_ALL,
read_timeout: None,
write_timeout: None,
}
}
pub fn auth_token(mut self, token: impl Into<String>) -> Self {
self.auth_token = Some(token.into());
self
}
pub fn client_name(mut self, name: impl Into<String>) -> Self {
self.client_name = Some(name.into());
self
}
pub fn filter_mask(mut self, mask: u8) -> Self {
self.filter_mask = mask;
self
}
pub fn read_timeout(mut self, timeout: Duration) -> Self {
self.read_timeout = Some(timeout);
self
}
pub fn write_timeout(mut self, timeout: Duration) -> Self {
self.write_timeout = Some(timeout);
self
}
pub fn connect(self) -> Result<LoggingClient> {
let mut stream = UnixStream::connect(&self.socket)?;
stream.set_read_timeout(self.read_timeout)?;
stream.set_write_timeout(self.write_timeout)?;
let subscribe = PluginSubscribeRequest {
filter_mask: self.filter_mask,
auth_token: self.auth_token,
client_name: self.client_name,
};
let payload = bincode::serialize(&subscribe)?;
write_frame(
&mut stream,
PLUGIN_FRAME_MAGIC,
PLUGIN_PROTOCOL_VERSION,
PluginFrameType::Subscribe as u8,
&payload,
)?;
let (ft, payload) = read_frame(&mut stream, PLUGIN_FRAME_MAGIC, PLUGIN_PROTOCOL_VERSION)?;
match PluginFrameType::from_u8(ft) {
Some(PluginFrameType::Ack) => {
let _ack: PluginSubscribeAck = bincode::deserialize(&payload)?;
}
Some(PluginFrameType::Error) => {
let err: PluginErrorFrame = bincode::deserialize(&payload)?;
return Err(Error::Remote {
code: err.code,
message: err.message,
});
}
_ => {
return Err(Error::Protocol("unexpected first frame".to_string()));
}
}
Ok(LoggingClient { stream })
}
}
pub struct LoggingClient {
stream: UnixStream,
}
impl LoggingClient {
pub fn builder(socket: impl Into<String>) -> LoggingClientBuilder {
LoggingClientBuilder::new(socket)
}
pub fn next_event(&mut self) -> Result<PluginEvent> {
loop {
let (ft, payload) = read_frame(
&mut self.stream,
PLUGIN_FRAME_MAGIC,
PLUGIN_PROTOCOL_VERSION,
)?;
match PluginFrameType::from_u8(ft) {
Some(PluginFrameType::Event) => {
let evt: PluginEvent = bincode::deserialize(&payload)?;
return Ok(evt);
}
Some(PluginFrameType::Ping) => {
let ping: PluginPingFrame = bincode::deserialize(&payload)?;
let pong = PluginPongFrame {
ts_millis: ping.ts_millis,
};
let payload = bincode::serialize(&pong)?;
write_frame(
&mut self.stream,
PLUGIN_FRAME_MAGIC,
PLUGIN_PROTOCOL_VERSION,
PluginFrameType::Pong as u8,
&payload,
)?;
}
Some(PluginFrameType::Error) => {
let err: PluginErrorFrame = bincode::deserialize(&payload)?;
return Err(Error::Remote {
code: err.code,
message: err.message,
});
}
_ => {
return Err(Error::Protocol("unexpected frame".to_string()));
}
}
}
}
}