1use crate::error::{Error, Result};
2use crate::wire::{read_frame, write_frame};
3use falcorn_proto::logging::{
4 PLUGIN_FILTER_ALL, PLUGIN_FRAME_MAGIC, PLUGIN_PROTOCOL_VERSION, PluginErrorFrame, PluginEvent,
5 PluginFrameType, PluginPingFrame, PluginPongFrame, PluginSubscribeAck, PluginSubscribeRequest,
6};
7use std::os::unix::net::UnixStream;
8use std::time::Duration;
9
10#[derive(Clone, Debug)]
11pub struct LoggingClientBuilder {
12 socket: String,
13 auth_token: Option<String>,
14 client_name: Option<String>,
15 filter_mask: u8,
16 read_timeout: Option<Duration>,
17 write_timeout: Option<Duration>,
18}
19
20impl LoggingClientBuilder {
21 pub fn new(socket: impl Into<String>) -> Self {
22 Self {
23 socket: socket.into(),
24 auth_token: None,
25 client_name: None,
26 filter_mask: PLUGIN_FILTER_ALL,
27 read_timeout: None,
28 write_timeout: None,
29 }
30 }
31
32 pub fn auth_token(mut self, token: impl Into<String>) -> Self {
33 self.auth_token = Some(token.into());
34 self
35 }
36
37 pub fn client_name(mut self, name: impl Into<String>) -> Self {
38 self.client_name = Some(name.into());
39 self
40 }
41
42 pub fn filter_mask(mut self, mask: u8) -> Self {
43 self.filter_mask = mask;
44 self
45 }
46
47 pub fn read_timeout(mut self, timeout: Duration) -> Self {
48 self.read_timeout = Some(timeout);
49 self
50 }
51
52 pub fn write_timeout(mut self, timeout: Duration) -> Self {
53 self.write_timeout = Some(timeout);
54 self
55 }
56
57 pub fn connect(self) -> Result<LoggingClient> {
58 let mut stream = UnixStream::connect(&self.socket)?;
59 stream.set_read_timeout(self.read_timeout)?;
60 stream.set_write_timeout(self.write_timeout)?;
61
62 let subscribe = PluginSubscribeRequest {
63 filter_mask: self.filter_mask,
64 auth_token: self.auth_token,
65 client_name: self.client_name,
66 };
67 let payload = bincode::serialize(&subscribe)?;
68 write_frame(
69 &mut stream,
70 PLUGIN_FRAME_MAGIC,
71 PLUGIN_PROTOCOL_VERSION,
72 PluginFrameType::Subscribe as u8,
73 &payload,
74 )?;
75
76 let (ft, payload) = read_frame(&mut stream, PLUGIN_FRAME_MAGIC, PLUGIN_PROTOCOL_VERSION)?;
77 match PluginFrameType::from_u8(ft) {
78 Some(PluginFrameType::Ack) => {
79 let _ack: PluginSubscribeAck = bincode::deserialize(&payload)?;
80 }
81 Some(PluginFrameType::Error) => {
82 let err: PluginErrorFrame = bincode::deserialize(&payload)?;
83 return Err(Error::Remote {
84 code: err.code,
85 message: err.message,
86 });
87 }
88 _ => {
89 return Err(Error::Protocol("unexpected first frame".to_string()));
90 }
91 }
92
93 Ok(LoggingClient { stream })
94 }
95}
96
97pub struct LoggingClient {
98 stream: UnixStream,
99}
100
101impl LoggingClient {
102 pub fn builder(socket: impl Into<String>) -> LoggingClientBuilder {
103 LoggingClientBuilder::new(socket)
104 }
105
106 pub fn next_event(&mut self) -> Result<PluginEvent> {
107 loop {
108 let (ft, payload) = read_frame(
109 &mut self.stream,
110 PLUGIN_FRAME_MAGIC,
111 PLUGIN_PROTOCOL_VERSION,
112 )?;
113 match PluginFrameType::from_u8(ft) {
114 Some(PluginFrameType::Event) => {
115 let evt: PluginEvent = bincode::deserialize(&payload)?;
116 return Ok(evt);
117 }
118 Some(PluginFrameType::Ping) => {
119 let ping: PluginPingFrame = bincode::deserialize(&payload)?;
120 let pong = PluginPongFrame {
121 ts_millis: ping.ts_millis,
122 };
123 let payload = bincode::serialize(&pong)?;
124 write_frame(
125 &mut self.stream,
126 PLUGIN_FRAME_MAGIC,
127 PLUGIN_PROTOCOL_VERSION,
128 PluginFrameType::Pong as u8,
129 &payload,
130 )?;
131 }
132 Some(PluginFrameType::Error) => {
133 let err: PluginErrorFrame = bincode::deserialize(&payload)?;
134 return Err(Error::Remote {
135 code: err.code,
136 message: err.message,
137 });
138 }
139 _ => {
140 return Err(Error::Protocol("unexpected frame".to_string()));
141 }
142 }
143 }
144 }
145}