endpoint_libs/libs/ws/
basics.rs1use parking_lot::RwLock;
2use serde::*;
3use serde_json::Value;
4use std::fmt::Debug;
5use std::net::SocketAddr;
6use std::sync::atomic::AtomicU64;
7use std::sync::Arc;
8
9use crate::libs::error_code::ErrorCode;
10use crate::libs::handler::RequestHandlerErased;
11use crate::libs::log::LogLevel;
12use crate::libs::toolbox::RequestContext;
13use crate::model::EndpointSchema;
14
15pub type ConnectionId = u32;
16#[derive(Debug, Serialize, Deserialize, Default, Clone)]
17pub struct WsRequestGeneric<Req> {
18 pub method: u32,
19 pub seq: u32,
20 pub params: Req,
21}
22pub type WsRequestValue = WsRequestGeneric<Value>;
23
24#[derive(Debug, Serialize, Deserialize, Default, Clone)]
25pub struct WsResponseError {
26 pub method: u32,
27 pub code: u32,
28 pub seq: u32,
29 pub log_id: String,
30 pub params: Value,
31}
32
33#[derive(Debug)]
34pub struct WsConnection {
35 pub connection_id: ConnectionId,
36 pub user_id: AtomicU64,
37 pub roles: Arc<RwLock<Arc<Vec<u32>>>>,
38 pub address: SocketAddr,
39 pub log_id: u64,
40}
41impl WsConnection {
42 pub fn get_user_id(&self) -> u64 {
43 self.user_id.load(std::sync::atomic::Ordering::Acquire)
44 }
45
46 pub fn get_roles(&self) -> Vec<u32> {
47 let roles = self.roles.read();
48 roles.as_ref().clone()
49 }
50
51 pub fn set_user_id(&self, user_id: u64) {
52 self.user_id
53 .store(user_id, std::sync::atomic::Ordering::Release);
54 }
55
56 pub fn set_roles(&self, roles: Arc<Vec<u32>>) {
57 let mut roles_lock = self.roles.write();
58 *roles_lock = roles;
59 }
60}
61
62pub type WsSuccessResponse = WsSuccessResponseGeneric<Value>;
63pub type WsStreamResponse = WsStreamResponseGeneric<Value>;
64
65#[derive(Debug, Serialize, Deserialize, Clone)]
66pub struct WsForwardedResponse {
67 pub method: u32,
68 pub seq: u32,
69}
70#[derive(Debug, Serialize, Deserialize, Clone)]
71pub struct WsSuccessResponseGeneric<Params> {
72 pub method: u32,
73 pub seq: u32,
74 pub params: Params,
75}
76
77#[derive(Debug, Serialize, Deserialize, Clone)]
78pub struct WsStreamResponseGeneric<Params> {
79 pub original_seq: u32,
80 pub method: u32,
81 pub stream_seq: u32,
82 pub stream_code: u32,
83 pub data: Params,
84}
85#[derive(Debug, Serialize, Deserialize, Clone)]
86pub struct WsLogResponse {
87 pub seq: u32,
88 pub log_id: u64,
89 pub level: LogLevel,
90 pub message: String,
91}
92
93#[derive(Debug, Serialize, Deserialize, Clone)]
94#[serde(tag = "type")]
95pub enum WsResponseGeneric<Resp> {
96 Immediate(WsSuccessResponseGeneric<Resp>),
97 Stream(WsStreamResponseGeneric<Resp>),
98 Error(WsResponseError),
99 Log(WsLogResponse),
100 Forwarded(WsForwardedResponse),
101 Close,
102}
103
104pub type WsResponseValue = WsResponseGeneric<Value>;
105
106pub struct WsEndpoint {
107 pub schema: EndpointSchema,
108 pub handler: Arc<dyn RequestHandlerErased>,
109}
110
111pub fn internal_error_to_resp(
112 ctx: &RequestContext,
113 code: ErrorCode,
114 err0: eyre::Error,
115) -> WsResponseValue {
116 let log_id = ctx.log_id.to_string();
117 let err = WsResponseError {
118 method: ctx.method,
119 code: code.to_u32(),
120 seq: ctx.seq,
121 log_id,
122 params: Value::Null,
123 };
124 tracing::error!("Internal error: {:?} {:?}", err, err0);
125 WsResponseValue::Error(err)
126}
127
128pub fn request_error_to_resp(
129 ctx: &RequestContext,
130 code: ErrorCode,
131 params: impl Into<Value>,
132) -> WsResponseValue {
133 let log_id = ctx.log_id.to_string();
134 let params = params.into();
135 let err = WsResponseError {
136 method: ctx.method,
137 code: code.to_u32(),
138 seq: ctx.seq,
139 log_id,
140 params,
141 };
142 tracing::warn!("Request error: {:?}", err);
143 WsResponseValue::Error(err)
144}