ng_net/actors/app/
session.rs1use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18
19use crate::app_protocol::*;
20use crate::broker::BROKER;
21use crate::connection::NoiseFSM;
22use crate::types::*;
23use crate::{actor::*, types::ProtocolMessage};
24
25impl AppSessionStart {
26 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
27 Actor::<AppSessionStart, AppSessionStartResponse>::new_responder(id)
28 }
29}
30
31impl TryFrom<ProtocolMessage> for AppSessionStart {
32 type Error = ProtocolError;
33 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
34 if let AppMessageContentV0::SessionStart(req) = msg.try_into()? {
35 Ok(req)
36 } else {
37 log_debug!("INVALID AppMessageContentV0::SessionStart");
38 Err(ProtocolError::InvalidValue)
39 }
40 }
41}
42
43impl From<AppSessionStart> for ProtocolMessage {
44 fn from(request: AppSessionStart) -> ProtocolMessage {
45 AppMessageContentV0::SessionStart(request).into()
46 }
47}
48
49impl TryFrom<ProtocolMessage> for AppSessionStartResponse {
50 type Error = ProtocolError;
51 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
52 if let AppMessageContentV0::Response(AppResponse::V0(AppResponseV0::SessionStart(res))) =
53 msg.try_into()?
54 {
55 Ok(res)
56 } else {
57 log_debug!("INVALID AppSessionStartResponse");
58 Err(ProtocolError::InvalidValue)
59 }
60 }
61}
62
63impl From<AppSessionStartResponse> for AppMessage {
64 fn from(response: AppSessionStartResponse) -> AppMessage {
65 AppResponse::V0(AppResponseV0::SessionStart(response)).into()
66 }
67}
68
69impl From<AppSessionStartResponse> for ProtocolMessage {
70 fn from(response: AppSessionStartResponse) -> ProtocolMessage {
71 response.into()
72 }
73}
74
75impl Actor<'_, AppSessionStart, AppSessionStartResponse> {}
76
77#[async_trait::async_trait]
78impl EActor for Actor<'_, AppSessionStart, AppSessionStartResponse> {
79 async fn respond(
80 &mut self,
81 msg: ProtocolMessage,
82 fsm: Arc<Mutex<NoiseFSM>>,
83 ) -> Result<(), ProtocolError> {
84 let req = AppSessionStart::try_from(msg)?;
85 let res = {
86 let lock = fsm.lock().await;
87 let remote = lock.remote_peer();
88
89 if remote.is_none() {
92 Err(ServerError::BrokerError)
93 } else {
94 let (sb, broker_id) = {
95 let b = BROKER.read().await;
96 (b.get_server_broker()?, b.get_server_peer_id())
97 };
98 let lock = sb.read().await;
99 lock.app_session_start(req, remote.unwrap(), broker_id)
100 .await
101 }
102 };
103 let app_message: AppMessage = match res {
104 Err(e) => e.into(),
105 Ok(o) => o.into(),
106 };
107 fsm.lock()
108 .await
109 .send_in_reply_to(app_message.into(), self.id())
110 .await?;
111 Ok(())
112 }
113}
114
115impl AppSessionStop {
118 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
119 Actor::<AppSessionStop, EmptyAppResponse>::new_responder(id)
120 }
121}
122
123impl TryFrom<ProtocolMessage> for AppSessionStop {
124 type Error = ProtocolError;
125 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
126 if let AppMessageContentV0::SessionStop(req) = msg.try_into()? {
127 Ok(req)
128 } else {
129 log_debug!("INVALID AppMessageContentV0::SessionStop");
130 Err(ProtocolError::InvalidValue)
131 }
132 }
133}
134
135impl TryFrom<ProtocolMessage> for EmptyAppResponse {
136 type Error = ProtocolError;
137 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
138 let res: Result<AppMessageContentV0, ProtocolError> = msg.try_into();
139 if let AppMessageContentV0::EmptyResponse = res? {
140 Ok(EmptyAppResponse(()))
141 } else {
142 log_debug!("INVALID AppMessageContentV0::EmptyResponse");
143 Err(ProtocolError::InvalidValue)
144 }
145 }
146}
147
148impl From<AppSessionStop> for ProtocolMessage {
149 fn from(request: AppSessionStop) -> ProtocolMessage {
150 AppMessageContentV0::SessionStop(request).into()
151 }
152}
153
154impl From<Result<EmptyAppResponse, ServerError>> for ProtocolMessage {
155 fn from(res: Result<EmptyAppResponse, ServerError>) -> ProtocolMessage {
156 match res {
157 Ok(_a) => ServerError::Ok.into(),
158 Err(err) => AppMessage::V0(AppMessageV0 {
159 id: 0,
160 result: err.into(),
161 content: AppMessageContentV0::EmptyResponse,
162 }),
163 }
164 .into()
165 }
166}
167
168impl Actor<'_, AppSessionStop, EmptyAppResponse> {}
169
170#[async_trait::async_trait]
171impl EActor for Actor<'_, AppSessionStop, EmptyAppResponse> {
172 async fn respond(
173 &mut self,
174 msg: ProtocolMessage,
175 fsm: Arc<Mutex<NoiseFSM>>,
176 ) -> Result<(), ProtocolError> {
177 let req = AppSessionStop::try_from(msg)?;
178 let res = {
179 let lock = fsm.lock().await;
180 let remote = lock.remote_peer();
181
182 if remote.is_none() {
183 Err(ServerError::BrokerError)
184 } else {
185 let sb = { BROKER.read().await.get_server_broker()? };
186 let lock = sb.read().await;
187 lock.app_session_stop(req, remote.as_ref().unwrap()).await
188 }
189 };
190
191 fsm.lock()
192 .await
193 .send_in_reply_to(res.into(), self.id())
194 .await?;
195 Ok(())
196 }
197}