endpoint_libs/libs/ws/
basics.rs

1use serde::*;
2use serde_json::Value;
3use std::fmt::Debug;
4use std::net::SocketAddr;
5use std::sync::atomic::{AtomicU32, AtomicU64};
6use std::sync::Arc;
7
8use crate::libs::error_code::ErrorCode;
9use crate::libs::handler::RequestHandlerErased;
10use crate::libs::log::LogLevel;
11use crate::libs::toolbox::RequestContext;
12use crate::model::EndpointSchema;
13
14pub type ConnectionId = u32;
15#[derive(Debug, Serialize, Deserialize, Default, Clone)]
16pub struct WsRequestGeneric<Req> {
17    pub method: u32,
18    pub seq: u32,
19    pub params: Req,
20}
21pub type WsRequestValue = WsRequestGeneric<Value>;
22
23#[derive(Debug, Serialize, Deserialize, Default, Clone)]
24pub struct WsResponseError {
25    pub method: u32,
26    pub code: u32,
27    pub seq: u32,
28    pub log_id: String,
29    pub params: Value,
30}
31
32#[derive(Debug)]
33pub struct WsConnection {
34    pub connection_id: ConnectionId,
35    pub user_id: AtomicU64,
36    pub role: AtomicU32,
37    pub address: SocketAddr,
38    pub log_id: u64,
39}
40impl WsConnection {
41    pub fn get_user_id(&self) -> u64 {
42        self.user_id.load(std::sync::atomic::Ordering::Acquire)
43    }
44
45    pub fn get_role(&self) -> u32 {
46        self.role.load(std::sync::atomic::Ordering::Acquire)
47    }
48}
49
50pub type WsSuccessResponse = WsSuccessResponseGeneric<Value>;
51pub type WsStreamResponse = WsStreamResponseGeneric<Value>;
52
53#[derive(Debug, Serialize, Deserialize, Clone)]
54pub struct WsForwardedResponse {
55    pub method: u32,
56    pub seq: u32,
57}
58#[derive(Debug, Serialize, Deserialize, Clone)]
59pub struct WsSuccessResponseGeneric<Params> {
60    pub method: u32,
61    pub seq: u32,
62    pub params: Params,
63}
64
65#[derive(Debug, Serialize, Deserialize, Clone)]
66pub struct WsStreamResponseGeneric<Params> {
67    pub original_seq: u32,
68    pub method: u32,
69    pub stream_seq: u32,
70    pub stream_code: u32,
71    pub data: Params,
72}
73#[derive(Debug, Serialize, Deserialize, Clone)]
74pub struct WsLogResponse {
75    pub seq: u32,
76    pub log_id: u64,
77    pub level: LogLevel,
78    pub message: String,
79}
80
81#[derive(Debug, Serialize, Deserialize, Clone)]
82#[serde(tag = "type")]
83pub enum WsResponseGeneric<Resp> {
84    Immediate(WsSuccessResponseGeneric<Resp>),
85    Stream(WsStreamResponseGeneric<Resp>),
86    Error(WsResponseError),
87    Log(WsLogResponse),
88    Forwarded(WsForwardedResponse),
89    Close,
90}
91
92pub type WsResponseValue = WsResponseGeneric<Value>;
93
94pub struct WsEndpoint {
95    pub schema: EndpointSchema,
96    pub handler: Arc<dyn RequestHandlerErased>,
97}
98
99pub fn internal_error_to_resp(
100    ctx: &RequestContext,
101    code: ErrorCode,
102    err0: eyre::Error,
103) -> WsResponseValue {
104    let log_id = ctx.log_id.to_string();
105    let err = WsResponseError {
106        method: ctx.method,
107        code: code.to_u32(),
108        seq: ctx.seq,
109        log_id,
110        params: Value::Null,
111    };
112    tracing::error!("Internal error: {:?} {:?}", err, err0);
113    WsResponseValue::Error(err)
114}
115
116pub fn request_error_to_resp(
117    ctx: &RequestContext,
118    code: ErrorCode,
119    params: impl Into<Value>,
120) -> WsResponseValue {
121    let log_id = ctx.log_id.to_string();
122    let params = params.into();
123    let err = WsResponseError {
124        method: ctx.method,
125        code: code.to_u32(),
126        seq: ctx.seq,
127        log_id,
128        params,
129    };
130    tracing::warn!("Request error: {:?}", err);
131    WsResponseValue::Error(err)
132}