use lark_websocket_protobuf::pbbp2::{Frame, Header};
use log::{debug, error, trace};
use serde::{Deserialize, Serialize};
use std::time::Instant;
use crate::{
client::ws_client::{ClientConfig, WsEvent},
event::dispatcher::EventDispatcherHandler,
};
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FrameType {
Control = 0,
Data = 1,
}
pub struct FrameHandler;
impl FrameHandler {
pub async fn handle_frame(
frame: Frame,
event_handler: &EventDispatcherHandler,
event_tx: &tokio::sync::mpsc::UnboundedSender<WsEvent>,
) -> Option<Frame> {
match frame.method {
0 => Self::handle_control_frame(frame),
1 => Self::handle_data_frame(frame, event_handler, event_tx).await,
_ => {
error!("Unknown frame method: {}", frame.method);
None
}
}
}
fn handle_control_frame(frame: Frame) -> Option<Frame> {
let headers = &frame.headers;
let frame_type = Self::get_header_value(headers, "type")?;
trace!("Received control frame: {frame_type}");
match frame_type.as_str() {
"pong" => Self::handle_pong_frame(frame),
_ => {
debug!("Unhandled control frame type: {frame_type}");
None
}
}
}
fn handle_pong_frame(frame: Frame) -> Option<Frame> {
let payload = frame.payload.as_ref()?;
match serde_json::from_slice::<ClientConfig>(payload) {
Ok(config) => {
debug!("Received pong with config: {config:?}");
Some(frame)
}
Err(e) => {
error!("Failed to parse ClientConfig from pong frame: {e:?}");
None
}
}
}
async fn handle_data_frame(
mut frame: Frame,
event_handler: &EventDispatcherHandler,
_event_tx: &tokio::sync::mpsc::UnboundedSender<WsEvent>,
) -> Option<Frame> {
let headers = &frame.headers;
let msg_type = Self::get_header_value(headers, "type").unwrap_or_default();
let msg_id = Self::get_header_value(headers, "message_id").unwrap_or_default();
let trace_id = Self::get_header_value(headers, "trace_id").unwrap_or_default();
let Some(payload) = frame.payload else {
error!("Data frame missing payload");
return None;
};
debug!(
"Received data frame - type: {msg_type}, message_id: {msg_id}, trace_id: {trace_id}"
);
match msg_type.as_str() {
"event" => {
let response = Self::process_event(payload, event_handler).await;
if let Some(biz_rt) = response.headers.get("biz_rt") {
frame.headers.push(Header {
key: "biz_rt".to_string(),
value: biz_rt.clone(),
});
}
frame.payload = Some(serde_json::to_vec(&response).unwrap_or_else(|e| {
error!("Failed to serialize response: {e:?}");
vec![]
}));
Some(frame)
}
"card" => {
debug!("Card frame received, skipping");
None
}
_ => {
debug!("Unknown data frame type: {msg_type}");
None
}
}
}
async fn process_event(
payload: Vec<u8>,
event_handler: &EventDispatcherHandler,
) -> NewWsResponse {
let start = Instant::now();
match event_handler.do_without_validation(payload) {
Ok(_) => {
let elapsed = start.elapsed().as_millis();
let mut response = NewWsResponse::ok();
response
.headers
.insert("biz_rt".to_string(), elapsed.to_string());
response
}
Err(err) => {
error!("Failed to handle event: {err:?}");
NewWsResponse::error()
}
}
}
fn get_header_value(headers: &[Header], key: &str) -> Option<String> {
headers
.iter()
.find(|h| h.key == key)
.map(|h| h.value.clone())
}
pub fn build_ping_frame(service_id: i32) -> Frame {
Frame {
seq_id: 0,
log_id: 0,
service: service_id,
method: 0, headers: vec![Header {
key: "type".to_string(),
value: "ping".to_string(),
}],
payload_encoding: None,
payload_type: None,
payload: None,
log_id_new: None,
}
}
pub fn build_response_frame(service_id: i32, headers: Vec<Header>, payload: Vec<u8>) -> Frame {
Frame {
seq_id: 0,
log_id: 0,
service: service_id,
method: 1, headers,
payload_encoding: None,
payload_type: None,
payload: Some(payload),
log_id_new: None,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct NewWsResponse {
code: u16,
headers: std::collections::HashMap<String, String>,
data: Vec<u8>,
}
impl NewWsResponse {
fn ok() -> Self {
Self {
code: 200,
headers: Default::default(),
data: Default::default(),
}
}
fn error() -> Self {
Self {
code: 500,
headers: Default::default(),
data: Default::default(),
}
}
}