1use std::convert::TryInto;
2use std::sync::Arc;
3
4use flume::{unbounded, Sender};
6use futures::{FutureExt, StreamExt};
7use tokio::sync::Mutex;
8use warp::ws::Message;
9
10use warp::{ws::WebSocket, Filter};
12
13use crate::settings;
15use crate::Event;
16use pyrinas_shared::{ota::OtaVersion, ManagmentDataType};
17
18use serde_cbor;
20
21use crate::Error;
23
24pub type AdminClient = Arc<Mutex<Option<Sender<Result<Message, warp::Error>>>>>;
25
26async fn handle_connection(
28 broker_sender: Sender<Event>,
29 websocket: WebSocket,
30 client: AdminClient,
31) {
32 log::debug!("Got stream!");
33
34 if client.lock().await.is_some() {
36 log::warn!("Already connected to admin client!");
37 return;
38 }
39
40 let (ws_tx, mut ws_rx) = websocket.split();
42
43 let (tx, rx) = unbounded();
46 tokio::task::spawn(rx.into_stream().forward(ws_tx).map(|result| {
47 if let Err(e) = result {
48 eprintln!("websocket send error: {}", e);
49 }
50 }));
51
52 {
54 *client.lock().await = Some(tx);
55 }
56
57 while let Some(Ok(msg)) = ws_rx.next().await {
58 log::debug!("msg size: {}", msg.as_bytes().len());
59
60 let req: pyrinas_shared::ManagementData =
62 serde_cbor::from_slice(msg.as_bytes()).expect("Unable to deserialize ManagementData");
63
64 match req.cmd {
66 ManagmentDataType::AddOta => {
67 let ota_update: pyrinas_shared::ota::v2::OtaUpdate =
69 serde_cbor::from_slice(&req.msg).expect("Unable to deserialize OtaUpdate");
70
71 let _ = broker_sender
73 .send_async(Event::OtaNewPackage(ota_update))
74 .await
75 .expect("Unable to send OtaNewPackage to broker.");
76 }
77 ManagmentDataType::LinkOta => {
78 let a: pyrinas_shared::OtaLink =
80 serde_cbor::from_slice(&req.msg).expect("Unable to deserialize OtaLink");
81
82 let ver = match a.ota_version.try_into() {
83 Ok(v) => v,
84 Err(_) => OtaVersion::V2,
85 };
86
87 let _ = broker_sender
89 .send_async(Event::OtaLink {
90 device_id: a.device_id,
91 group_id: a.group_id,
92 image_id: a.image_id,
93 ota_version: ver,
94 })
95 .await
96 .expect("Unable to send OtaNewPackage to broker.");
97 }
98 ManagmentDataType::RemoveOta => {
99 let image_id = match String::from_utf8(req.msg) {
101 Ok(id) => id,
102 Err(_) => {
103 log::warn!("Unable to get image_id!");
104 continue;
105 }
106 };
107
108 let _ = broker_sender
110 .send_async(Event::OtaDeletePackage(image_id))
111 .await
112 .expect("Unable to send OtaNewPackage to broker.");
113 }
114 ManagmentDataType::Application => {
116 broker_sender
117 .send_async(Event::ApplicationManagementRequest(req))
118 .await
119 .expect("Unable to send ApplicationManagementRequest to broker.");
120 }
121 ManagmentDataType::UnlinkOta => {
122 let a: pyrinas_shared::OtaLink =
124 serde_cbor::from_slice(&req.msg).expect("Unable to deserialize OtaLink");
125
126 let _ = broker_sender
128 .send_async(Event::OtaUnlink {
129 device_id: a.device_id,
130 group_id: a.group_id,
131 })
132 .await
133 .expect("Unable to send OtaNewPackage to broker.");
134 }
135 ManagmentDataType::GetGroupList => {
136 broker_sender
137 .send_async(Event::OtaUpdateGroupListRequest())
138 .await
139 .expect("Unable to send ApplicationManagementRequest to broker.");
140 }
141 ManagmentDataType::GetImageList => {
142 broker_sender
143 .send_async(Event::OtaUpdateImageListRequest())
144 .await
145 .expect("Unable to send ApplicationManagementRequest to broker.");
146 }
147 }
148 }
149
150 {
152 *client.lock().await = None;
153 }
154}
155
156pub async fn run(settings: &settings::Admin, broker_sender: Sender<Event>) -> Result<(), Error> {
158 let (sender, receiver) = unbounded::<Event>();
160
161 let client: AdminClient = Default::default();
163 let from_broker_client = client.clone();
164
165 let client_filter = warp::any().map(move || client.clone());
167
168 tokio::task::spawn(async move {
169 let c = from_broker_client;
170
171 while let Ok(event) = receiver.recv_async().await {
172 let data = match event {
173 Event::OtaUpdateImageListRequestResponse(r) => match serde_cbor::to_vec(&r) {
174 Ok(v) => v,
175 Err(_) => {
176 log::warn!("Unable to serialize image list!");
177 continue;
178 }
179 },
180 Event::OtaUpdateGroupListRequestResponse(r) => match serde_cbor::to_vec(&r) {
181 Ok(v) => v,
182 Err(_) => {
183 log::warn!("Unable to serialize group list!");
184 continue;
185 }
186 },
187 Event::ApplicationManagementResponse(r) => match serde_cbor::to_vec(&r) {
188 Ok(v) => v,
189 Err(_) => {
190 log::warn!("Unable to application management response!");
191 continue;
192 }
193 },
194 _ => {
195 log::warn!("Unhandled command sent to admin!");
196 continue;
197 }
198 };
199
200 if let Some(c) = c.lock().await.as_ref() {
201 if let Err(e) = c.send_async(Ok(Message::binary(data))).await {
202 log::error!("Unabe to send message to admin! Err: {}", e);
203 };
204 }
205 }
206 });
207
208 broker_sender
210 .send_async(Event::NewRunner {
211 name: "sock".to_string(),
212 sender: sender.clone(),
213 })
214 .await?;
215
216 let stream = warp::get()
219 .and(warp::path("socket"))
220 .and(warp::header::exact(
221 "ApiKey",
222 Box::leak(settings.api_key.clone().into_boxed_str()),
223 ))
224 .and(warp::ws())
225 .and(client_filter)
226 .map(move |ws: warp::ws::Ws, client: AdminClient| {
227 log::debug!("Before upgrade..");
228
229 let broker_sender = broker_sender.clone();
230
231 ws.on_upgrade(|socket| {
233 handle_connection(broker_sender, socket, client)
235 })
236 });
237
238 warp::serve(stream)
240 .run(([127, 0, 0, 1], settings.port))
241 .await;
242
243 Ok(())
244}
245
246