ping_viewer_next/device/manager/
mod.rs

1/// Specially for DeviceManager to retrieve checks and structures from Devices stored in it's hashmap collection
2pub mod continuous_mode;
3/// Specially for auto creation methods, from UDP or serial port
4pub mod device_discovery;
5/// Specially for continuous_mode methods, startup, shutdown, handle and errors routines for each device type
6pub mod device_handle;
7
8use paperclip::actix::Apiv2Schema;
9use serde::{Deserialize, Serialize};
10use std::{
11    collections::{hash_map::DefaultHasher, HashMap},
12    hash::{Hash, Hasher},
13    net::{Ipv4Addr, SocketAddrV4, UdpSocket},
14    ops::Deref,
15    time::Duration,
16};
17use tokio::{
18    sync::{mpsc, oneshot},
19    time::sleep,
20};
21
22use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
23use tracing::{error, info, trace, warn};
24use udp_stream::UdpStream;
25use uuid::Uuid;
26
27use super::devices::{DeviceActor, DeviceActorHandler};
28use bluerobotics_ping::device::{Ping1D, Ping360};
29
30pub struct Device {
31    pub id: Uuid,
32    pub source: SourceSelection,
33    pub handler: super::devices::DeviceActorHandler,
34    pub actor: tokio::task::JoinHandle<DeviceActor>,
35    pub broadcast: Option<tokio::task::JoinHandle<()>>,
36    pub status: DeviceStatus,
37    pub device_type: DeviceSelection,
38}
39
40#[derive(Debug, Serialize, Deserialize, Clone)]
41pub struct DeviceInfo {
42    pub id: Uuid,
43    pub source: SourceSelection,
44    pub status: DeviceStatus,
45    pub device_type: DeviceSelection,
46}
47impl Device {
48    pub fn info(&self) -> DeviceInfo {
49        DeviceInfo {
50            id: self.id,
51            source: self.source.clone(),
52            status: self.status.clone(),
53            device_type: self.device_type.clone(),
54        }
55    }
56}
57
58impl Drop for Device {
59    fn drop(&mut self) {
60        trace!(
61            "Removing Device from DeviceManager, details: {:?}",
62            self.info()
63        );
64        self.actor.abort();
65        if let Some(broadcast_handle) = &self.broadcast {
66            trace!("Device broadcast handle closed for: {:?}", self.info().id);
67            broadcast_handle.abort();
68        }
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Apiv2Schema)]
73pub enum DeviceSelection {
74    Common,
75    Ping1D,
76    Ping360,
77    Auto,
78}
79
80#[derive(Debug, Clone, Deserialize, Serialize, Hash, Apiv2Schema)]
81pub enum SourceSelection {
82    UdpStream(SourceUdpStruct),
83    SerialStream(SourceSerialStruct),
84}
85
86enum SourceType {
87    Udp(UdpStream),
88    Serial(SerialStream),
89}
90
91#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)]
92pub struct SourceUdpStruct {
93    pub ip: Ipv4Addr,
94    pub port: u16,
95}
96
97#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)]
98pub struct SourceSerialStruct {
99    pub path: String,
100    pub baudrate: u32,
101}
102
103#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
104pub enum DeviceStatus {
105    Running,
106    Stopped,
107    ContinuousMode,
108}
109
110pub struct DeviceManager {
111    receiver: mpsc::Receiver<ManagerActorRequest>,
112    pub device: HashMap<Uuid, Device>,
113}
114
115#[derive(Debug)]
116pub struct ManagerActorRequest {
117    pub request: Request,
118    pub respond_to: oneshot::Sender<Result<Answer, ManagerError>>,
119}
120#[derive(Clone)]
121pub struct ManagerActorHandler {
122    pub sender: mpsc::Sender<ManagerActorRequest>,
123}
124
125#[derive(Debug, Serialize, Deserialize, Clone, Apiv2Schema)]
126pub enum Answer {
127    DeviceMessage(DeviceAnswer),
128    #[serde(skip)]
129    InnerDeviceHandler(DeviceActorHandler),
130    DeviceInfo(Vec<DeviceInfo>),
131}
132
133#[derive(Debug, Serialize, Deserialize, Clone)]
134pub enum ManagerError {
135    DeviceNotExist(Uuid),
136    DeviceAlreadyExist(Uuid),
137    DeviceStatus(DeviceStatus, Uuid),
138    DeviceError(super::devices::DeviceError),
139    DeviceSourceError(String),
140    NoDevices,
141    TokioMpsc(String),
142    NotImplemented(Request),
143    Other(String),
144}
145
146#[derive(Debug, Serialize, Deserialize, Clone)]
147pub struct DeviceAnswer {
148    #[serde(flatten)]
149    pub answer: crate::device::devices::PingAnswer,
150    pub device_id: Uuid,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
154#[serde(tag = "command", content = "payload")]
155pub enum Request {
156    AutoCreate,
157    Create(CreateStruct),
158    Delete(UuidWrapper),
159    List,
160    Info(UuidWrapper),
161    Search,
162    Ping(DeviceRequestStruct),
163    GetDeviceHandler(UuidWrapper),
164    ModifyDevice(ModifyDevice),
165    EnableContinuousMode(UuidWrapper),
166    DisableContinuousMode(UuidWrapper),
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
170pub enum ModifyDeviceCommand {
171    Ip(Ipv4Addr),
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
175pub struct ModifyDevice {
176    pub uuid: Uuid,
177    pub modify: ModifyDeviceCommand,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
181pub struct UuidWrapper {
182    pub uuid: Uuid,
183}
184
185impl Deref for UuidWrapper {
186    type Target = Uuid;
187
188    fn deref(&self) -> &Self::Target {
189        &self.uuid
190    }
191}
192#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
193pub struct CreateStruct {
194    pub source: SourceSelection,
195    pub device_selection: DeviceSelection,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct DeviceRequestStruct {
200    pub uuid: Uuid,
201    pub device_request: crate::device::devices::PingRequest,
202}
203
204impl DeviceManager {
205    async fn handle_message(&mut self, actor_request: ManagerActorRequest) {
206        trace!("DeviceManager: Received a request, details: {actor_request:?}");
207        match actor_request.request {
208            Request::AutoCreate => {
209                let result = self.auto_create().await;
210                if let Err(e) = actor_request.respond_to.send(result) {
211                    error!("DeviceManager: Failed to return AutoCreate response: {e:?}");
212                }
213            }
214            Request::Create(request) => {
215                let result = self.create(request.source, request.device_selection).await;
216                if let Err(e) = actor_request.respond_to.send(result) {
217                    error!("DeviceManager: Failed to return Create response: {e:?}");
218                }
219            }
220            Request::Delete(uuid) => {
221                let result = self.delete(*uuid).await;
222                if let Err(e) = actor_request.respond_to.send(result) {
223                    error!("DeviceManager: Failed to return Delete response: {e:?}");
224                }
225            }
226            Request::List => {
227                let result = self.list().await;
228                if let Err(e) = actor_request.respond_to.send(result) {
229                    error!("DeviceManager: Failed to return List response: {e:?}");
230                }
231            }
232            Request::Info(device_id) => {
233                let result = self.info(*device_id).await;
234                if let Err(e) = actor_request.respond_to.send(result) {
235                    error!("DeviceManager: Failed to return Info response: {:?}", e);
236                }
237            }
238            Request::EnableContinuousMode(uuid) => {
239                let result = self.continuous_mode(*uuid).await;
240                if let Err(e) = actor_request.respond_to.send(result) {
241                    error!("DeviceManager: Failed to return EnableContinuousMode response: {e:?}");
242                }
243            }
244            Request::DisableContinuousMode(uuid) => {
245                let result = self.continuous_mode_off(*uuid).await;
246                if let Err(e) = actor_request.respond_to.send(result) {
247                    error!("DeviceManager: Failed to return DisableContinuousMode response: {e:?}");
248                }
249            }
250            Request::GetDeviceHandler(id) => {
251                let answer = self.get_device_handler(*id).await;
252                if let Err(e) = actor_request.respond_to.send(answer) {
253                    error!("DeviceManager: Failed to return GetDeviceHandler response: {e:?}");
254                }
255            }
256            Request::ModifyDevice(request) => {
257                let answer = self.modify_device(request).await;
258                if let Err(err) = actor_request.respond_to.send(answer) {
259                    error!("DeviceManager: Failed to return ModifyDevice response: {err:?}");
260                }
261            }
262            _ => {
263                if let Err(e) = actor_request
264                    .respond_to
265                    .send(Err(ManagerError::NotImplemented(actor_request.request)))
266                {
267                    warn!("DeviceManager: Failed to return response: {e:?}");
268                }
269            }
270        }
271    }
272
273    pub fn new(size: usize) -> (Self, ManagerActorHandler) {
274        let (sender, receiver) = mpsc::channel(size);
275        let actor = DeviceManager {
276            receiver,
277            device: HashMap::new(),
278        };
279        let actor_handler = ManagerActorHandler { sender };
280
281        trace!("DeviceManager and handler successfully created: Success");
282        (actor, actor_handler)
283    }
284
285    pub async fn run(mut self) {
286        info!("DeviceManager is running");
287        while let Some(msg) = self.receiver.recv().await {
288            self.update_devices_status().await; // Todo: move to an outer process
289            self.handle_message(msg).await;
290        }
291        error!("DeviceManager has stopped please check your application");
292    }
293
294    pub async fn update_devices_status(&mut self) {
295        if let Ok(Answer::DeviceInfo(answer)) = self.list().await {
296            for device in answer {
297                if let Some(device_entry) = self.device.get_mut(&device.id) {
298                    if device_entry.status == DeviceStatus::Stopped {
299                        break;
300                    }
301                    if device_entry.actor.is_finished() {
302                        info!("Device stopped, device id: {device:?}");
303                        device_entry.status = DeviceStatus::Stopped;
304                    }
305                }
306            }
307        }
308    }
309
310    pub async fn create(
311        &mut self,
312        source: SourceSelection,
313        mut device_selection: DeviceSelection,
314    ) -> Result<Answer, ManagerError> {
315        let mut hasher = DefaultHasher::new();
316        source.hash(&mut hasher);
317        let hash = Uuid::from_u128(hasher.finish().into());
318
319        if self.device.contains_key(&hash) {
320            trace!("Device creation error: Device already exist for provided SourceSelection, details: {source:?}");
321            return Err(ManagerError::DeviceAlreadyExist(hash));
322        }
323
324        let port = match &source {
325            SourceSelection::UdpStream(source_udp_struct) => {
326                let socket_addr = SocketAddrV4::new(source_udp_struct.ip, source_udp_struct.port);
327
328                let udp_stream = UdpStream::connect(socket_addr.into())
329                    .await
330                    .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?;
331                SourceType::Udp(udp_stream)
332            }
333            SourceSelection::SerialStream(source_serial_struct) => {
334                let mut serial_stream: SerialStream =
335                    tokio_serial::new(&source_serial_struct.path, source_serial_struct.baudrate)
336                        .open_native_async()
337                        .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?;
338
339                device_discovery::set_baudrate_pre_routine(
340                    &mut serial_stream,
341                    source_serial_struct.baudrate,
342                )
343                .await?;
344
345                serial_stream
346                    .clear(tokio_serial::ClearBuffer::All)
347                    .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?;
348
349                SourceType::Serial(serial_stream)
350            }
351        };
352
353        let device = match port {
354            SourceType::Udp(udp_port) => match device_selection {
355                DeviceSelection::Common | DeviceSelection::Auto => {
356                    crate::device::devices::DeviceType::Common(
357                        bluerobotics_ping::common::Device::new(udp_port),
358                    )
359                }
360                DeviceSelection::Ping1D => {
361                    crate::device::devices::DeviceType::Ping1D(Ping1D::new(udp_port))
362                }
363                DeviceSelection::Ping360 => {
364                    crate::device::devices::DeviceType::Ping360(Ping360::new(udp_port))
365                }
366            },
367            SourceType::Serial(serial_port) => match device_selection {
368                DeviceSelection::Common | DeviceSelection::Auto => {
369                    crate::device::devices::DeviceType::Common(
370                        bluerobotics_ping::common::Device::new(serial_port),
371                    )
372                }
373                DeviceSelection::Ping1D => {
374                    crate::device::devices::DeviceType::Ping1D(Ping1D::new(serial_port))
375                }
376                DeviceSelection::Ping360 => {
377                    crate::device::devices::DeviceType::Ping360(Ping360::new(serial_port))
378                }
379            },
380        };
381
382        let (mut device, handler) = super::devices::DeviceActor::new(device, 10);
383
384        if device_selection == DeviceSelection::Auto {
385            let mut retry_count = 0;
386            let max_retries = 3;
387            let retry_delay = Duration::from_millis(100);
388
389            loop {
390                match device.try_upgrade().await {
391                    Ok(super::devices::PingAnswer::UpgradeResult(result)) => {
392                        match result {
393                            super::devices::UpgradeResult::Unknown => {
394                                device_selection = DeviceSelection::Common;
395                            }
396                            super::devices::UpgradeResult::Ping1D => {
397                                device_selection = DeviceSelection::Ping1D;
398                            }
399                            super::devices::UpgradeResult::Ping360 => {
400                                device_selection = DeviceSelection::Ping360;
401                            }
402                        }
403                        break;
404                    }
405                    Err(err) => {
406                        retry_count += 1;
407                        if retry_count >= max_retries {
408                            error!(
409                                "Device creation error: Can't auto upgrade the DeviceType after {} attempts, details: {err:?}",
410                                max_retries
411                            );
412                            return Err(ManagerError::DeviceError(err));
413                        }
414
415                        warn!(
416                            "Device creation error: Device upgrade attempt {} of {} failed: {err:?}. Retrying...",
417                            retry_count, max_retries
418                        );
419
420                        sleep(retry_delay).await;
421                        continue;
422                    }
423                    e => warn!("Device creation error: Abnormal answer: {e:?}."),
424                }
425            }
426        }
427
428        let actor = tokio::spawn(async move { device.run().await });
429
430        let device = Device {
431            id: hash,
432            source,
433            handler,
434            actor,
435            status: DeviceStatus::Running,
436            broadcast: None,
437            device_type: device_selection,
438        };
439
440        self.device.insert(hash, device);
441
442        trace!("Device broadcast enable by default for: {hash:?}");
443        let device_info = self.continuous_mode(hash).await?;
444
445        info!("New device created and available, details: {device_info:?}");
446        Ok(device_info)
447    }
448
449    pub async fn auto_create(&mut self) -> Result<Answer, ManagerError> {
450        let mut results = Vec::new();
451
452        let mut available_source = Vec::new();
453
454        match device_discovery::serial_discovery().await {
455            Some(result) => available_source.extend(result),
456            None => warn!("Auto create: Unable to find available devices on serial ports"),
457        }
458
459        match device_discovery::network_discovery() {
460            Some(result) => available_source.extend(result),
461            None => warn!("Auto create: Unable to find available devices on network"),
462        }
463
464        for source in available_source {
465            match self.create(source.clone(), DeviceSelection::Auto).await {
466                Ok(answer) => match answer {
467                    Answer::DeviceInfo(device_info) => {
468                        results.extend(device_info);
469                    }
470                    msg => {
471                        warn!("Some unexpected message during auto_create, details: {msg:?}")
472                    }
473                },
474                Err(err) => {
475                    error!("Failed to create device for source {source:?}: {err:?}");
476                }
477            }
478        }
479
480        Ok(Answer::DeviceInfo(results))
481    }
482
483    pub async fn list(&self) -> Result<Answer, ManagerError> {
484        if self.device.is_empty() {
485            trace!("No devices available for list generation request");
486            return Err(ManagerError::NoDevices);
487        };
488        let mut list = Vec::new();
489        for device in self.device.values() {
490            list.push(device.info())
491        }
492        Ok(Answer::DeviceInfo(list))
493    }
494
495    pub async fn info(&self, device_id: Uuid) -> Result<Answer, ManagerError> {
496        self.check_device_uuid(device_id)?;
497        Ok(Answer::DeviceInfo(vec![self.get_device(device_id)?.info()]))
498    }
499
500    pub async fn delete(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
501        match self.device.remove(&device_id) {
502            Some(device) => {
503                let device_info = device.info();
504                drop(device);
505                trace!("Device delete id {device_id:?}: Success",);
506                Ok(Answer::DeviceInfo(vec![device_info]))
507            }
508            None => {
509                error!("Device delete id {device_id:?} : Error, device doesn't exist");
510                Err(ManagerError::DeviceNotExist(device_id))
511            }
512        }
513    }
514
515    pub async fn continuous_mode(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
516        self.check_device_status(device_id, &[DeviceStatus::Running])?;
517        let device_type = self.get_device_type(device_id)?;
518
519        // Get an inner subscriber for device's stream
520        let subscriber = self.get_subscriber(device_id).await?;
521
522        let broadcast_handle = self
523            .continuous_mode_start(subscriber, device_id, device_type.clone())
524            .await;
525        if let Some(handle) = &broadcast_handle {
526            if !handle.is_finished() {
527                trace!("Success start_continuous_mode for {device_id:?}");
528            } else {
529                return Err(ManagerError::Other(
530                    "Error while start_continuous_mode".to_string(),
531                ));
532            }
533        } else {
534            return Err(ManagerError::Other(
535                "Error while start_continuous_mode".to_string(),
536            ));
537        };
538
539        self.continuous_mode_startup_routine(device_id, device_type)
540            .await?;
541
542        let device = self.get_mut_device(device_id)?;
543        device.broadcast = broadcast_handle;
544        device.status = DeviceStatus::ContinuousMode;
545
546        let updated_device_info = self.get_device(device_id)?.info();
547
548        Ok(Answer::DeviceInfo(vec![updated_device_info]))
549    }
550
551    pub async fn continuous_mode_off(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
552        self.check_device_status(device_id, &[DeviceStatus::ContinuousMode])?;
553        let device_type = self.get_device_type(device_id)?;
554
555        let device = self.get_mut_device(device_id)?;
556        if let Some(broadcast) = device.broadcast.take() {
557            broadcast.abort_handle().abort();
558        }
559
560        device.status = DeviceStatus::Running;
561
562        let updated_device_info = device.info();
563
564        self.continuous_mode_shutdown_routine(device_id, device_type)
565            .await?;
566
567        Ok(Answer::DeviceInfo(vec![updated_device_info]))
568    }
569
570    pub async fn modify_device(&mut self, request: ModifyDevice) -> Result<Answer, ManagerError> {
571        match request.modify {
572            ModifyDeviceCommand::Ip(ip) => {
573                let device_info = self.info(request.uuid).await?;
574                let Answer::DeviceInfo(data) = device_info else {
575                    return Err(ManagerError::NoDevices);
576                };
577
578                let Some(info) = data.first() else {
579                    return Err(ManagerError::NoDevices);
580                };
581
582                let SourceSelection::UdpStream(inner) = &info.source else {
583                    return Err(ManagerError::Other(format!(
584                        "modify_device : invalid request for device : {request:?}"
585                    )));
586                };
587
588                self.modify_device_ip(ip, inner.ip).await?;
589                self.delete(request.uuid).await
590            }
591        }
592    }
593
594    pub async fn modify_device_ip(
595        &mut self,
596        ip: Ipv4Addr,
597        destination: Ipv4Addr,
598    ) -> Result<(), ManagerError> {
599        let socket =
600            UdpSocket::bind("0.0.0.0:0").map_err(|err| ManagerError::Other(err.to_string()))?; // Bind to any available port
601        socket
602            .set_broadcast(true)
603            .map_err(|err| ManagerError::Other(err.to_string()))?;
604
605        let command = format!("SetSS1IP {}", ip);
606
607        socket
608            .send_to(command.as_bytes(), format!("{destination}:30303"))
609            .map_err(|err| ManagerError::Other(err.to_string()))?;
610        Ok(())
611    }
612}
613
614impl ManagerActorHandler {
615    pub async fn send(&self, request: Request) -> Result<Answer, ManagerError> {
616        let (result_sender, result_receiver) = oneshot::channel();
617
618        match &request {
619            // Devices requests are forwarded directly to device and let manager handle other incoming request.
620            Request::Ping(request) => {
621                trace!("Handling Ping request: {request:?}: Forwarding request to device handler");
622                let get_handler_target = request.uuid;
623                let handler_request =
624                    Request::GetDeviceHandler(crate::device::manager::UuidWrapper {
625                        uuid: get_handler_target,
626                    });
627                let manager_request = ManagerActorRequest {
628                    request: handler_request,
629                    respond_to: result_sender,
630                };
631                self.sender
632                    .send(manager_request)
633                    .await
634                    .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?;
635                let result = match result_receiver
636                    .await
637                    .map_err(|err| ManagerError::TokioMpsc(err.to_string()))
638                {
639                    Ok(ans) => ans,
640                    Err(err) => {
641                        error!("DeviceManagerHandler: Failed to receive handler from Manager, details: {err:?}");
642                        return Err(err);
643                    }
644                };
645
646                match result? {
647                    Answer::InnerDeviceHandler(handler) => {
648                        trace!(
649                            "Handling Ping request: {request:?}: Successfully received the handler"
650                        );
651                        let result = handler.send(request.device_request.clone()).await;
652                        match result {
653                            Ok(result) => {
654                                info!("Handling Ping request: {request:?}: Success");
655                                Ok(Answer::DeviceMessage(DeviceAnswer {
656                                    answer: result,
657                                    device_id: request.uuid,
658                                }))
659                            }
660                            Err(err) => {
661                                error!(
662                                    "Handling Ping request: {request:?}: Error ocurred on device: {err:?}"                                );
663                                Err(ManagerError::DeviceError(err))
664                            }
665                        }
666                    }
667                    answer => Ok(answer), //should be unreachable
668                }
669            }
670            _ => {
671                trace!("Handling DeviceManager request: {request:?}: Forwarding request.");
672                let device_request = ManagerActorRequest {
673                    request: request.clone(),
674                    respond_to: result_sender,
675                };
676
677                self.sender
678                    .send(device_request)
679                    .await
680                    .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?;
681
682                match result_receiver
683                    .await
684                    .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?
685                {
686                    Ok(ans) => {
687                        trace!("Handling DeviceManager request: {request:?}: Success");
688                        Ok(ans)
689                    }
690                    Err(err) => {
691                        error!(
692                            "Handling DeviceManager request: {request:?}: Error ocurred on manager: {err:?}",
693                        );
694                        Err(err)
695                    }
696                }
697            }
698        }
699    }
700}