endpoint_libs/libs/ws/
basics.rs

1use parking_lot::RwLock;
2use serde::*;
3use serde_json::Value;
4use std::fmt::Debug;
5use std::net::SocketAddr;
6use std::sync::atomic::AtomicU64;
7use std::sync::Arc;
8
9use crate::libs::error_code::ErrorCode;
10use crate::libs::handler::RequestHandlerErased;
11use crate::libs::log::LogLevel;
12use crate::libs::toolbox::RequestContext;
13use crate::model::EndpointSchema;
14
15pub type ConnectionId = u32;
16#[derive(Debug, Serialize, Deserialize, Default, Clone)]
17pub struct WsRequestGeneric<Req> {
18    pub method: u32,
19    pub seq: u32,
20    pub params: Req,
21}
22pub type WsRequestValue = WsRequestGeneric<Value>;
23
24#[derive(Debug, Serialize, Deserialize, Default, Clone)]
25pub struct WsResponseError {
26    pub method: u32,
27    pub code: u32,
28    pub seq: u32,
29    pub log_id: String,
30    pub params: Value,
31}
32
33#[derive(Debug)]
34pub struct WsConnection {
35    pub connection_id: ConnectionId,
36    pub user_id: AtomicU64,
37    pub roles: Arc<RwLock<Arc<Vec<u32>>>>,
38    pub address: SocketAddr,
39    pub log_id: u64,
40}
41impl WsConnection {
42    pub fn get_user_id(&self) -> u64 {
43        self.user_id.load(std::sync::atomic::Ordering::Acquire)
44    }
45
46    pub fn get_roles(&self) -> Vec<u32> {
47        let roles = self.roles.read();
48        roles.as_ref().clone()
49    }
50
51    pub fn set_user_id(&self, user_id: u64) {
52        self.user_id
53            .store(user_id, std::sync::atomic::Ordering::Release);
54    }
55
56    pub fn set_roles(&self, roles: Arc<Vec<u32>>) {
57        let mut roles_lock = self.roles.write();
58        *roles_lock = roles;
59    }
60}
61
62pub type WsSuccessResponse = WsSuccessResponseGeneric<Value>;
63pub type WsStreamResponse = WsStreamResponseGeneric<Value>;
64
65#[derive(Debug, Serialize, Deserialize, Clone)]
66pub struct WsForwardedResponse {
67    pub method: u32,
68    pub seq: u32,
69}
70#[derive(Debug, Serialize, Deserialize, Clone)]
71pub struct WsSuccessResponseGeneric<Params> {
72    pub method: u32,
73    pub seq: u32,
74    pub params: Params,
75}
76
77#[derive(Debug, Serialize, Deserialize, Clone)]
78pub struct WsStreamResponseGeneric<Params> {
79    pub original_seq: u32,
80    pub method: u32,
81    pub stream_seq: u32,
82    pub stream_code: u32,
83    pub data: Params,
84}
85#[derive(Debug, Serialize, Deserialize, Clone)]
86pub struct WsLogResponse {
87    pub seq: u32,
88    pub log_id: u64,
89    pub level: LogLevel,
90    pub message: String,
91}
92
93#[derive(Debug, Serialize, Deserialize, Clone)]
94#[serde(tag = "type")]
95pub enum WsResponseGeneric<Resp> {
96    Immediate(WsSuccessResponseGeneric<Resp>),
97    Stream(WsStreamResponseGeneric<Resp>),
98    Error(WsResponseError),
99    Log(WsLogResponse),
100    Forwarded(WsForwardedResponse),
101    Close,
102}
103
104pub type WsResponseValue = WsResponseGeneric<Value>;
105
106pub struct WsEndpoint {
107    pub schema: EndpointSchema,
108    pub handler: Arc<dyn RequestHandlerErased>,
109}
110
111pub fn internal_error_to_resp(
112    ctx: &RequestContext,
113    code: ErrorCode,
114    err0: eyre::Error,
115) -> WsResponseValue {
116    let log_id = ctx.log_id.to_string();
117    let err = WsResponseError {
118        method: ctx.method,
119        code: code.to_u32(),
120        seq: ctx.seq,
121        log_id,
122        params: Value::Null,
123    };
124    tracing::error!("Internal error: {:?} {:?}", err, err0);
125    WsResponseValue::Error(err)
126}
127
128pub fn request_error_to_resp(
129    ctx: &RequestContext,
130    code: ErrorCode,
131    params: impl Into<Value>,
132) -> WsResponseValue {
133    let log_id = ctx.log_id.to_string();
134    let params = params.into();
135    let err = WsResponseError {
136        method: ctx.method,
137        code: code.to_u32(),
138        seq: ctx.seq,
139        log_id,
140        params,
141    };
142    tracing::warn!("Request error: {:?}", err);
143    WsResponseValue::Error(err)
144}