pyrinas_server/
admin.rs

1use std::convert::TryInto;
2use std::sync::Arc;
3
4// async Related
5use flume::{unbounded, Sender};
6use futures::{FutureExt, StreamExt};
7use tokio::sync::Mutex;
8use warp::ws::Message;
9
10// Unix listener
11use warp::{ws::WebSocket, Filter};
12
13// Local lib related
14use crate::settings;
15use crate::Event;
16use pyrinas_shared::{ota::OtaVersion, ManagmentDataType};
17
18// Cbor
19use serde_cbor;
20
21// Error
22use crate::Error;
23
24pub type AdminClient = Arc<Mutex<Option<Sender<Result<Message, warp::Error>>>>>;
25
26// Handle the incoming connection
27async fn handle_connection(
28    broker_sender: Sender<Event>,
29    websocket: WebSocket,
30    client: AdminClient,
31) {
32    log::debug!("Got stream!");
33
34    // Ensure only one admin connection
35    if client.lock().await.is_some() {
36        log::warn!("Already connected to admin client!");
37        return;
38    }
39
40    // Make a connection
41    let (ws_tx, mut ws_rx) = websocket.split();
42
43    // Use an unbounded channel to handle buffering and flushing of messages
44    // to the websocket...
45    let (tx, rx) = unbounded();
46    tokio::task::spawn(rx.into_stream().forward(ws_tx).map(|result| {
47        if let Err(e) = result {
48            eprintln!("websocket send error: {}", e);
49        }
50    }));
51
52    // Handle tx portion of things..
53    {
54        *client.lock().await = Some(tx);
55    }
56
57    while let Some(Ok(msg)) = ws_rx.next().await {
58        log::debug!("msg size: {}", msg.as_bytes().len());
59
60        // First deocde into ManagementRequest struct
61        let req: pyrinas_shared::ManagementData =
62            serde_cbor::from_slice(msg.as_bytes()).expect("Unable to deserialize ManagementData");
63
64        // Next step in the managment request process
65        match req.cmd {
66            ManagmentDataType::AddOta => {
67                // Dedcode ota update
68                let ota_update: pyrinas_shared::ota::v2::OtaUpdate =
69                    serde_cbor::from_slice(&req.msg).expect("Unable to deserialize OtaUpdate");
70
71                // Send if decode was successful
72                let _ = broker_sender
73                    .send_async(Event::OtaNewPackage(ota_update))
74                    .await
75                    .expect("Unable to send OtaNewPackage to broker.");
76            }
77            ManagmentDataType::LinkOta => {
78                // Dedcode ota update
79                let a: pyrinas_shared::OtaLink =
80                    serde_cbor::from_slice(&req.msg).expect("Unable to deserialize OtaLink");
81
82                let ver = match a.ota_version.try_into() {
83                    Ok(v) => v,
84                    Err(_) => OtaVersion::V2,
85                };
86
87                // Send if decode was successful
88                let _ = broker_sender
89                    .send_async(Event::OtaLink {
90                        device_id: a.device_id,
91                        group_id: a.group_id,
92                        image_id: a.image_id,
93                        ota_version: ver,
94                    })
95                    .await
96                    .expect("Unable to send OtaNewPackage to broker.");
97            }
98            ManagmentDataType::RemoveOta => {
99                // Dedcode ota update
100                let image_id = match String::from_utf8(req.msg) {
101                    Ok(id) => id,
102                    Err(_) => {
103                        log::warn!("Unable to get image_id!");
104                        continue;
105                    }
106                };
107
108                // Send if decode was successful
109                let _ = broker_sender
110                    .send_async(Event::OtaDeletePackage(image_id))
111                    .await
112                    .expect("Unable to send OtaNewPackage to broker.");
113            }
114            // Otherwise send all others to application
115            ManagmentDataType::Application => {
116                broker_sender
117                    .send_async(Event::ApplicationManagementRequest(req))
118                    .await
119                    .expect("Unable to send ApplicationManagementRequest to broker.");
120            }
121            ManagmentDataType::UnlinkOta => {
122                // Dedcode ota update
123                let a: pyrinas_shared::OtaLink =
124                    serde_cbor::from_slice(&req.msg).expect("Unable to deserialize OtaLink");
125
126                // Send if decode was successful
127                let _ = broker_sender
128                    .send_async(Event::OtaUnlink {
129                        device_id: a.device_id,
130                        group_id: a.group_id,
131                    })
132                    .await
133                    .expect("Unable to send OtaNewPackage to broker.");
134            }
135            ManagmentDataType::GetGroupList => {
136                broker_sender
137                    .send_async(Event::OtaUpdateGroupListRequest())
138                    .await
139                    .expect("Unable to send ApplicationManagementRequest to broker.");
140            }
141            ManagmentDataType::GetImageList => {
142                broker_sender
143                    .send_async(Event::OtaUpdateImageListRequest())
144                    .await
145                    .expect("Unable to send ApplicationManagementRequest to broker.");
146            }
147        }
148    }
149
150    // Handle tx portion of things..
151    {
152        *client.lock().await = None;
153    }
154}
155
156// Only requires a sender. No response necessary here... yet.
157pub async fn run(settings: &settings::Admin, broker_sender: Sender<Event>) -> Result<(), Error> {
158    // Get the sender/reciever associated with this particular task
159    let (sender, receiver) = unbounded::<Event>();
160
161    // Handle reciever end
162    let client: AdminClient = Default::default();
163    let from_broker_client = client.clone();
164
165    // Client filter
166    let client_filter = warp::any().map(move || client.clone());
167
168    tokio::task::spawn(async move {
169        let c = from_broker_client;
170
171        while let Ok(event) = receiver.recv_async().await {
172            let data = match event {
173                Event::OtaUpdateImageListRequestResponse(r) => match serde_cbor::to_vec(&r) {
174                    Ok(v) => v,
175                    Err(_) => {
176                        log::warn!("Unable to serialize image list!");
177                        continue;
178                    }
179                },
180                Event::OtaUpdateGroupListRequestResponse(r) => match serde_cbor::to_vec(&r) {
181                    Ok(v) => v,
182                    Err(_) => {
183                        log::warn!("Unable to serialize group list!");
184                        continue;
185                    }
186                },
187                Event::ApplicationManagementResponse(r) => match serde_cbor::to_vec(&r) {
188                    Ok(v) => v,
189                    Err(_) => {
190                        log::warn!("Unable to application management response!");
191                        continue;
192                    }
193                },
194                _ => {
195                    log::warn!("Unhandled command sent to admin!");
196                    continue;
197                }
198            };
199
200            if let Some(c) = c.lock().await.as_ref() {
201                if let Err(e) = c.send_async(Ok(Message::binary(data))).await {
202                    log::error!("Unabe to send message to admin! Err: {}", e);
203                };
204            }
205        }
206    });
207
208    // Register this task
209    broker_sender
210        .send_async(Event::NewRunner {
211            name: "sock".to_string(),
212            sender: sender.clone(),
213        })
214        .await?;
215
216    // ! Important: this leaks the api_key into the exact function. As long as this is only called once
217    // it's NBD.
218    let stream = warp::get()
219        .and(warp::path("socket"))
220        .and(warp::header::exact(
221            "ApiKey",
222            Box::leak(settings.api_key.clone().into_boxed_str()),
223        ))
224        .and(warp::ws())
225        .and(client_filter)
226        .map(move |ws: warp::ws::Ws, client: AdminClient| {
227            log::debug!("Before upgrade..");
228
229            let broker_sender = broker_sender.clone();
230
231            // And then our closure will be called when it completes...
232            ws.on_upgrade(|socket| {
233                // handle the connection
234                handle_connection(broker_sender, socket, client)
235            })
236        });
237
238    // Run the `warp` server
239    warp::serve(stream)
240        .run(([127, 0, 0, 1], settings.port))
241        .await;
242
243    Ok(())
244}
245
246// TODO: (test) try to send an "other" managment_request (gets forwarded to the application.)
247// TODO: (test) try to send an "add_ota" command