Skip to main content

falcorn_sdk/
logging.rs

1use crate::error::{Error, Result};
2use crate::wire::{read_frame, write_frame};
3use falcorn_proto::logging::{
4    PLUGIN_FILTER_ALL, PLUGIN_FRAME_MAGIC, PLUGIN_PROTOCOL_VERSION, PluginErrorFrame, PluginEvent,
5    PluginFrameType, PluginPingFrame, PluginPongFrame, PluginSubscribeAck, PluginSubscribeRequest,
6};
7use std::os::unix::net::UnixStream;
8use std::time::Duration;
9
10#[derive(Clone, Debug)]
11pub struct LoggingClientBuilder {
12    socket: String,
13    auth_token: Option<String>,
14    client_name: Option<String>,
15    filter_mask: u8,
16    read_timeout: Option<Duration>,
17    write_timeout: Option<Duration>,
18}
19
20impl LoggingClientBuilder {
21    pub fn new(socket: impl Into<String>) -> Self {
22        Self {
23            socket: socket.into(),
24            auth_token: None,
25            client_name: None,
26            filter_mask: PLUGIN_FILTER_ALL,
27            read_timeout: None,
28            write_timeout: None,
29        }
30    }
31
32    pub fn auth_token(mut self, token: impl Into<String>) -> Self {
33        self.auth_token = Some(token.into());
34        self
35    }
36
37    pub fn client_name(mut self, name: impl Into<String>) -> Self {
38        self.client_name = Some(name.into());
39        self
40    }
41
42    pub fn filter_mask(mut self, mask: u8) -> Self {
43        self.filter_mask = mask;
44        self
45    }
46
47    pub fn read_timeout(mut self, timeout: Duration) -> Self {
48        self.read_timeout = Some(timeout);
49        self
50    }
51
52    pub fn write_timeout(mut self, timeout: Duration) -> Self {
53        self.write_timeout = Some(timeout);
54        self
55    }
56
57    pub fn connect(self) -> Result<LoggingClient> {
58        let mut stream = UnixStream::connect(&self.socket)?;
59        stream.set_read_timeout(self.read_timeout)?;
60        stream.set_write_timeout(self.write_timeout)?;
61
62        let subscribe = PluginSubscribeRequest {
63            filter_mask: self.filter_mask,
64            auth_token: self.auth_token,
65            client_name: self.client_name,
66        };
67        let payload = bincode::serialize(&subscribe)?;
68        write_frame(
69            &mut stream,
70            PLUGIN_FRAME_MAGIC,
71            PLUGIN_PROTOCOL_VERSION,
72            PluginFrameType::Subscribe as u8,
73            &payload,
74        )?;
75
76        let (ft, payload) = read_frame(&mut stream, PLUGIN_FRAME_MAGIC, PLUGIN_PROTOCOL_VERSION)?;
77        match PluginFrameType::from_u8(ft) {
78            Some(PluginFrameType::Ack) => {
79                let _ack: PluginSubscribeAck = bincode::deserialize(&payload)?;
80            }
81            Some(PluginFrameType::Error) => {
82                let err: PluginErrorFrame = bincode::deserialize(&payload)?;
83                return Err(Error::Remote {
84                    code: err.code,
85                    message: err.message,
86                });
87            }
88            _ => {
89                return Err(Error::Protocol("unexpected first frame".to_string()));
90            }
91        }
92
93        Ok(LoggingClient { stream })
94    }
95}
96
97pub struct LoggingClient {
98    stream: UnixStream,
99}
100
101impl LoggingClient {
102    pub fn builder(socket: impl Into<String>) -> LoggingClientBuilder {
103        LoggingClientBuilder::new(socket)
104    }
105
106    pub fn next_event(&mut self) -> Result<PluginEvent> {
107        loop {
108            let (ft, payload) = read_frame(
109                &mut self.stream,
110                PLUGIN_FRAME_MAGIC,
111                PLUGIN_PROTOCOL_VERSION,
112            )?;
113            match PluginFrameType::from_u8(ft) {
114                Some(PluginFrameType::Event) => {
115                    let evt: PluginEvent = bincode::deserialize(&payload)?;
116                    return Ok(evt);
117                }
118                Some(PluginFrameType::Ping) => {
119                    let ping: PluginPingFrame = bincode::deserialize(&payload)?;
120                    let pong = PluginPongFrame {
121                        ts_millis: ping.ts_millis,
122                    };
123                    let payload = bincode::serialize(&pong)?;
124                    write_frame(
125                        &mut self.stream,
126                        PLUGIN_FRAME_MAGIC,
127                        PLUGIN_PROTOCOL_VERSION,
128                        PluginFrameType::Pong as u8,
129                        &payload,
130                    )?;
131                }
132                Some(PluginFrameType::Error) => {
133                    let err: PluginErrorFrame = bincode::deserialize(&payload)?;
134                    return Err(Error::Remote {
135                        code: err.code,
136                        message: err.message,
137                    });
138                }
139                _ => {
140                    return Err(Error::Protocol("unexpected frame".to_string()));
141                }
142            }
143        }
144    }
145}