1pub mod continuous_mode;
3pub mod device_discovery;
5pub mod device_handle;
7
8use paperclip::actix::Apiv2Schema;
9use serde::{Deserialize, Serialize};
10use std::{
11 collections::{hash_map::DefaultHasher, HashMap},
12 hash::{Hash, Hasher},
13 net::{Ipv4Addr, SocketAddrV4, UdpSocket},
14 ops::Deref,
15 time::Duration,
16};
17use tokio::{
18 sync::{mpsc, oneshot},
19 time::sleep,
20};
21
22use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
23use tracing::{error, info, trace, warn};
24use udp_stream::UdpStream;
25use uuid::Uuid;
26
27use super::devices::{DeviceActor, DeviceActorHandler};
28use bluerobotics_ping::device::{Ping1D, Ping360};
29
30pub struct Device {
31 pub id: Uuid,
32 pub source: SourceSelection,
33 pub handler: super::devices::DeviceActorHandler,
34 pub actor: tokio::task::JoinHandle<DeviceActor>,
35 pub broadcast: Option<tokio::task::JoinHandle<()>>,
36 pub status: DeviceStatus,
37 pub device_type: DeviceSelection,
38}
39
40#[derive(Debug, Serialize, Deserialize, Clone)]
41pub struct DeviceInfo {
42 pub id: Uuid,
43 pub source: SourceSelection,
44 pub status: DeviceStatus,
45 pub device_type: DeviceSelection,
46}
47impl Device {
48 pub fn info(&self) -> DeviceInfo {
49 DeviceInfo {
50 id: self.id,
51 source: self.source.clone(),
52 status: self.status.clone(),
53 device_type: self.device_type.clone(),
54 }
55 }
56}
57
58impl Drop for Device {
59 fn drop(&mut self) {
60 trace!(
61 "Removing Device from DeviceManager, details: {:?}",
62 self.info()
63 );
64 self.actor.abort();
65 if let Some(broadcast_handle) = &self.broadcast {
66 trace!("Device broadcast handle closed for: {:?}", self.info().id);
67 broadcast_handle.abort();
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Apiv2Schema)]
73pub enum DeviceSelection {
74 Common,
75 Ping1D,
76 Ping360,
77 Auto,
78}
79
80#[derive(Debug, Clone, Deserialize, Serialize, Hash, Apiv2Schema)]
81pub enum SourceSelection {
82 UdpStream(SourceUdpStruct),
83 SerialStream(SourceSerialStruct),
84}
85
86enum SourceType {
87 Udp(UdpStream),
88 Serial(SerialStream),
89}
90
91#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)]
92pub struct SourceUdpStruct {
93 pub ip: Ipv4Addr,
94 pub port: u16,
95}
96
97#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)]
98pub struct SourceSerialStruct {
99 pub path: String,
100 pub baudrate: u32,
101}
102
103#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
104pub enum DeviceStatus {
105 Running,
106 Stopped,
107 ContinuousMode,
108}
109
110pub struct DeviceManager {
111 receiver: mpsc::Receiver<ManagerActorRequest>,
112 pub device: HashMap<Uuid, Device>,
113}
114
115#[derive(Debug)]
116pub struct ManagerActorRequest {
117 pub request: Request,
118 pub respond_to: oneshot::Sender<Result<Answer, ManagerError>>,
119}
120#[derive(Clone)]
121pub struct ManagerActorHandler {
122 pub sender: mpsc::Sender<ManagerActorRequest>,
123}
124
125#[derive(Debug, Serialize, Deserialize, Clone, Apiv2Schema)]
126pub enum Answer {
127 DeviceMessage(DeviceAnswer),
128 #[serde(skip)]
129 InnerDeviceHandler(DeviceActorHandler),
130 DeviceInfo(Vec<DeviceInfo>),
131}
132
133#[derive(Debug, Serialize, Deserialize, Clone)]
134pub enum ManagerError {
135 DeviceNotExist(Uuid),
136 DeviceAlreadyExist(Uuid),
137 DeviceStatus(DeviceStatus, Uuid),
138 DeviceError(super::devices::DeviceError),
139 DeviceSourceError(String),
140 NoDevices,
141 TokioMpsc(String),
142 NotImplemented(Request),
143 Other(String),
144}
145
146#[derive(Debug, Serialize, Deserialize, Clone)]
147pub struct DeviceAnswer {
148 #[serde(flatten)]
149 pub answer: crate::device::devices::PingAnswer,
150 pub device_id: Uuid,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
154#[serde(tag = "command", content = "payload")]
155pub enum Request {
156 AutoCreate,
157 Create(CreateStruct),
158 Delete(UuidWrapper),
159 List,
160 Info(UuidWrapper),
161 Search,
162 Ping(DeviceRequestStruct),
163 GetDeviceHandler(UuidWrapper),
164 ModifyDevice(ModifyDevice),
165 EnableContinuousMode(UuidWrapper),
166 DisableContinuousMode(UuidWrapper),
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
170pub enum ModifyDeviceCommand {
171 Ip(Ipv4Addr),
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
175pub struct ModifyDevice {
176 pub uuid: Uuid,
177 pub modify: ModifyDeviceCommand,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
181pub struct UuidWrapper {
182 pub uuid: Uuid,
183}
184
185impl Deref for UuidWrapper {
186 type Target = Uuid;
187
188 fn deref(&self) -> &Self::Target {
189 &self.uuid
190 }
191}
192#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
193pub struct CreateStruct {
194 pub source: SourceSelection,
195 pub device_selection: DeviceSelection,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct DeviceRequestStruct {
200 pub uuid: Uuid,
201 pub device_request: crate::device::devices::PingRequest,
202}
203
204impl DeviceManager {
205 async fn handle_message(&mut self, actor_request: ManagerActorRequest) {
206 trace!("DeviceManager: Received a request, details: {actor_request:?}");
207 match actor_request.request {
208 Request::AutoCreate => {
209 let result = self.auto_create().await;
210 if let Err(e) = actor_request.respond_to.send(result) {
211 error!("DeviceManager: Failed to return AutoCreate response: {e:?}");
212 }
213 }
214 Request::Create(request) => {
215 let result = self.create(request.source, request.device_selection).await;
216 if let Err(e) = actor_request.respond_to.send(result) {
217 error!("DeviceManager: Failed to return Create response: {e:?}");
218 }
219 }
220 Request::Delete(uuid) => {
221 let result = self.delete(*uuid).await;
222 if let Err(e) = actor_request.respond_to.send(result) {
223 error!("DeviceManager: Failed to return Delete response: {e:?}");
224 }
225 }
226 Request::List => {
227 let result = self.list().await;
228 if let Err(e) = actor_request.respond_to.send(result) {
229 error!("DeviceManager: Failed to return List response: {e:?}");
230 }
231 }
232 Request::Info(device_id) => {
233 let result = self.info(*device_id).await;
234 if let Err(e) = actor_request.respond_to.send(result) {
235 error!("DeviceManager: Failed to return Info response: {:?}", e);
236 }
237 }
238 Request::EnableContinuousMode(uuid) => {
239 let result = self.continuous_mode(*uuid).await;
240 if let Err(e) = actor_request.respond_to.send(result) {
241 error!("DeviceManager: Failed to return EnableContinuousMode response: {e:?}");
242 }
243 }
244 Request::DisableContinuousMode(uuid) => {
245 let result = self.continuous_mode_off(*uuid).await;
246 if let Err(e) = actor_request.respond_to.send(result) {
247 error!("DeviceManager: Failed to return DisableContinuousMode response: {e:?}");
248 }
249 }
250 Request::GetDeviceHandler(id) => {
251 let answer = self.get_device_handler(*id).await;
252 if let Err(e) = actor_request.respond_to.send(answer) {
253 error!("DeviceManager: Failed to return GetDeviceHandler response: {e:?}");
254 }
255 }
256 Request::ModifyDevice(request) => {
257 let answer = self.modify_device(request).await;
258 if let Err(err) = actor_request.respond_to.send(answer) {
259 error!("DeviceManager: Failed to return ModifyDevice response: {err:?}");
260 }
261 }
262 _ => {
263 if let Err(e) = actor_request
264 .respond_to
265 .send(Err(ManagerError::NotImplemented(actor_request.request)))
266 {
267 warn!("DeviceManager: Failed to return response: {e:?}");
268 }
269 }
270 }
271 }
272
273 pub fn new(size: usize) -> (Self, ManagerActorHandler) {
274 let (sender, receiver) = mpsc::channel(size);
275 let actor = DeviceManager {
276 receiver,
277 device: HashMap::new(),
278 };
279 let actor_handler = ManagerActorHandler { sender };
280
281 trace!("DeviceManager and handler successfully created: Success");
282 (actor, actor_handler)
283 }
284
285 pub async fn run(mut self) {
286 info!("DeviceManager is running");
287 while let Some(msg) = self.receiver.recv().await {
288 self.update_devices_status().await; self.handle_message(msg).await;
290 }
291 error!("DeviceManager has stopped please check your application");
292 }
293
294 pub async fn update_devices_status(&mut self) {
295 if let Ok(Answer::DeviceInfo(answer)) = self.list().await {
296 for device in answer {
297 if let Some(device_entry) = self.device.get_mut(&device.id) {
298 if device_entry.status == DeviceStatus::Stopped {
299 break;
300 }
301 if device_entry.actor.is_finished() {
302 info!("Device stopped, device id: {device:?}");
303 device_entry.status = DeviceStatus::Stopped;
304 }
305 }
306 }
307 }
308 }
309
310 pub async fn create(
311 &mut self,
312 source: SourceSelection,
313 mut device_selection: DeviceSelection,
314 ) -> Result<Answer, ManagerError> {
315 let mut hasher = DefaultHasher::new();
316 source.hash(&mut hasher);
317 let hash = Uuid::from_u128(hasher.finish().into());
318
319 if self.device.contains_key(&hash) {
320 trace!("Device creation error: Device already exist for provided SourceSelection, details: {source:?}");
321 return Err(ManagerError::DeviceAlreadyExist(hash));
322 }
323
324 let port = match &source {
325 SourceSelection::UdpStream(source_udp_struct) => {
326 let socket_addr = SocketAddrV4::new(source_udp_struct.ip, source_udp_struct.port);
327
328 let udp_stream = UdpStream::connect(socket_addr.into())
329 .await
330 .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?;
331 SourceType::Udp(udp_stream)
332 }
333 SourceSelection::SerialStream(source_serial_struct) => {
334 let mut serial_stream: SerialStream =
335 tokio_serial::new(&source_serial_struct.path, source_serial_struct.baudrate)
336 .open_native_async()
337 .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?;
338
339 device_discovery::set_baudrate_pre_routine(
340 &mut serial_stream,
341 source_serial_struct.baudrate,
342 )
343 .await?;
344
345 serial_stream
346 .clear(tokio_serial::ClearBuffer::All)
347 .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?;
348
349 SourceType::Serial(serial_stream)
350 }
351 };
352
353 let device = match port {
354 SourceType::Udp(udp_port) => match device_selection {
355 DeviceSelection::Common | DeviceSelection::Auto => {
356 crate::device::devices::DeviceType::Common(
357 bluerobotics_ping::common::Device::new(udp_port),
358 )
359 }
360 DeviceSelection::Ping1D => {
361 crate::device::devices::DeviceType::Ping1D(Ping1D::new(udp_port))
362 }
363 DeviceSelection::Ping360 => {
364 crate::device::devices::DeviceType::Ping360(Ping360::new(udp_port))
365 }
366 },
367 SourceType::Serial(serial_port) => match device_selection {
368 DeviceSelection::Common | DeviceSelection::Auto => {
369 crate::device::devices::DeviceType::Common(
370 bluerobotics_ping::common::Device::new(serial_port),
371 )
372 }
373 DeviceSelection::Ping1D => {
374 crate::device::devices::DeviceType::Ping1D(Ping1D::new(serial_port))
375 }
376 DeviceSelection::Ping360 => {
377 crate::device::devices::DeviceType::Ping360(Ping360::new(serial_port))
378 }
379 },
380 };
381
382 let (mut device, handler) = super::devices::DeviceActor::new(device, 10);
383
384 if device_selection == DeviceSelection::Auto {
385 let mut retry_count = 0;
386 let max_retries = 3;
387 let retry_delay = Duration::from_millis(100);
388
389 loop {
390 match device.try_upgrade().await {
391 Ok(super::devices::PingAnswer::UpgradeResult(result)) => {
392 match result {
393 super::devices::UpgradeResult::Unknown => {
394 device_selection = DeviceSelection::Common;
395 }
396 super::devices::UpgradeResult::Ping1D => {
397 device_selection = DeviceSelection::Ping1D;
398 }
399 super::devices::UpgradeResult::Ping360 => {
400 device_selection = DeviceSelection::Ping360;
401 }
402 }
403 break;
404 }
405 Err(err) => {
406 retry_count += 1;
407 if retry_count >= max_retries {
408 error!(
409 "Device creation error: Can't auto upgrade the DeviceType after {} attempts, details: {err:?}",
410 max_retries
411 );
412 return Err(ManagerError::DeviceError(err));
413 }
414
415 warn!(
416 "Device creation error: Device upgrade attempt {} of {} failed: {err:?}. Retrying...",
417 retry_count, max_retries
418 );
419
420 sleep(retry_delay).await;
421 continue;
422 }
423 e => warn!("Device creation error: Abnormal answer: {e:?}."),
424 }
425 }
426 }
427
428 let actor = tokio::spawn(async move { device.run().await });
429
430 let device = Device {
431 id: hash,
432 source,
433 handler,
434 actor,
435 status: DeviceStatus::Running,
436 broadcast: None,
437 device_type: device_selection,
438 };
439
440 self.device.insert(hash, device);
441
442 trace!("Device broadcast enable by default for: {hash:?}");
443 let device_info = self.continuous_mode(hash).await?;
444
445 info!("New device created and available, details: {device_info:?}");
446 Ok(device_info)
447 }
448
449 pub async fn auto_create(&mut self) -> Result<Answer, ManagerError> {
450 let mut results = Vec::new();
451
452 let mut available_source = Vec::new();
453
454 match device_discovery::serial_discovery().await {
455 Some(result) => available_source.extend(result),
456 None => warn!("Auto create: Unable to find available devices on serial ports"),
457 }
458
459 match device_discovery::network_discovery() {
460 Some(result) => available_source.extend(result),
461 None => warn!("Auto create: Unable to find available devices on network"),
462 }
463
464 for source in available_source {
465 match self.create(source.clone(), DeviceSelection::Auto).await {
466 Ok(answer) => match answer {
467 Answer::DeviceInfo(device_info) => {
468 results.extend(device_info);
469 }
470 msg => {
471 warn!("Some unexpected message during auto_create, details: {msg:?}")
472 }
473 },
474 Err(err) => {
475 error!("Failed to create device for source {source:?}: {err:?}");
476 }
477 }
478 }
479
480 Ok(Answer::DeviceInfo(results))
481 }
482
483 pub async fn list(&self) -> Result<Answer, ManagerError> {
484 if self.device.is_empty() {
485 trace!("No devices available for list generation request");
486 return Err(ManagerError::NoDevices);
487 };
488 let mut list = Vec::new();
489 for device in self.device.values() {
490 list.push(device.info())
491 }
492 Ok(Answer::DeviceInfo(list))
493 }
494
495 pub async fn info(&self, device_id: Uuid) -> Result<Answer, ManagerError> {
496 self.check_device_uuid(device_id)?;
497 Ok(Answer::DeviceInfo(vec![self.get_device(device_id)?.info()]))
498 }
499
500 pub async fn delete(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
501 match self.device.remove(&device_id) {
502 Some(device) => {
503 let device_info = device.info();
504 drop(device);
505 trace!("Device delete id {device_id:?}: Success",);
506 Ok(Answer::DeviceInfo(vec![device_info]))
507 }
508 None => {
509 error!("Device delete id {device_id:?} : Error, device doesn't exist");
510 Err(ManagerError::DeviceNotExist(device_id))
511 }
512 }
513 }
514
515 pub async fn continuous_mode(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
516 self.check_device_status(device_id, &[DeviceStatus::Running])?;
517 let device_type = self.get_device_type(device_id)?;
518
519 let subscriber = self.get_subscriber(device_id).await?;
521
522 let broadcast_handle = self
523 .continuous_mode_start(subscriber, device_id, device_type.clone())
524 .await;
525 if let Some(handle) = &broadcast_handle {
526 if !handle.is_finished() {
527 trace!("Success start_continuous_mode for {device_id:?}");
528 } else {
529 return Err(ManagerError::Other(
530 "Error while start_continuous_mode".to_string(),
531 ));
532 }
533 } else {
534 return Err(ManagerError::Other(
535 "Error while start_continuous_mode".to_string(),
536 ));
537 };
538
539 self.continuous_mode_startup_routine(device_id, device_type)
540 .await?;
541
542 let device = self.get_mut_device(device_id)?;
543 device.broadcast = broadcast_handle;
544 device.status = DeviceStatus::ContinuousMode;
545
546 let updated_device_info = self.get_device(device_id)?.info();
547
548 Ok(Answer::DeviceInfo(vec![updated_device_info]))
549 }
550
551 pub async fn continuous_mode_off(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
552 self.check_device_status(device_id, &[DeviceStatus::ContinuousMode])?;
553 let device_type = self.get_device_type(device_id)?;
554
555 let device = self.get_mut_device(device_id)?;
556 if let Some(broadcast) = device.broadcast.take() {
557 broadcast.abort_handle().abort();
558 }
559
560 device.status = DeviceStatus::Running;
561
562 let updated_device_info = device.info();
563
564 self.continuous_mode_shutdown_routine(device_id, device_type)
565 .await?;
566
567 Ok(Answer::DeviceInfo(vec![updated_device_info]))
568 }
569
570 pub async fn modify_device(&mut self, request: ModifyDevice) -> Result<Answer, ManagerError> {
571 match request.modify {
572 ModifyDeviceCommand::Ip(ip) => {
573 let device_info = self.info(request.uuid).await?;
574 let Answer::DeviceInfo(data) = device_info else {
575 return Err(ManagerError::NoDevices);
576 };
577
578 let Some(info) = data.first() else {
579 return Err(ManagerError::NoDevices);
580 };
581
582 let SourceSelection::UdpStream(inner) = &info.source else {
583 return Err(ManagerError::Other(format!(
584 "modify_device : invalid request for device : {request:?}"
585 )));
586 };
587
588 self.modify_device_ip(ip, inner.ip).await?;
589 self.delete(request.uuid).await
590 }
591 }
592 }
593
594 pub async fn modify_device_ip(
595 &mut self,
596 ip: Ipv4Addr,
597 destination: Ipv4Addr,
598 ) -> Result<(), ManagerError> {
599 let socket =
600 UdpSocket::bind("0.0.0.0:0").map_err(|err| ManagerError::Other(err.to_string()))?; socket
602 .set_broadcast(true)
603 .map_err(|err| ManagerError::Other(err.to_string()))?;
604
605 let command = format!("SetSS1IP {}", ip);
606
607 socket
608 .send_to(command.as_bytes(), format!("{destination}:30303"))
609 .map_err(|err| ManagerError::Other(err.to_string()))?;
610 Ok(())
611 }
612}
613
614impl ManagerActorHandler {
615 pub async fn send(&self, request: Request) -> Result<Answer, ManagerError> {
616 let (result_sender, result_receiver) = oneshot::channel();
617
618 match &request {
619 Request::Ping(request) => {
621 trace!("Handling Ping request: {request:?}: Forwarding request to device handler");
622 let get_handler_target = request.uuid;
623 let handler_request =
624 Request::GetDeviceHandler(crate::device::manager::UuidWrapper {
625 uuid: get_handler_target,
626 });
627 let manager_request = ManagerActorRequest {
628 request: handler_request,
629 respond_to: result_sender,
630 };
631 self.sender
632 .send(manager_request)
633 .await
634 .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?;
635 let result = match result_receiver
636 .await
637 .map_err(|err| ManagerError::TokioMpsc(err.to_string()))
638 {
639 Ok(ans) => ans,
640 Err(err) => {
641 error!("DeviceManagerHandler: Failed to receive handler from Manager, details: {err:?}");
642 return Err(err);
643 }
644 };
645
646 match result? {
647 Answer::InnerDeviceHandler(handler) => {
648 trace!(
649 "Handling Ping request: {request:?}: Successfully received the handler"
650 );
651 let result = handler.send(request.device_request.clone()).await;
652 match result {
653 Ok(result) => {
654 info!("Handling Ping request: {request:?}: Success");
655 Ok(Answer::DeviceMessage(DeviceAnswer {
656 answer: result,
657 device_id: request.uuid,
658 }))
659 }
660 Err(err) => {
661 error!(
662 "Handling Ping request: {request:?}: Error ocurred on device: {err:?}" );
663 Err(ManagerError::DeviceError(err))
664 }
665 }
666 }
667 answer => Ok(answer), }
669 }
670 _ => {
671 trace!("Handling DeviceManager request: {request:?}: Forwarding request.");
672 let device_request = ManagerActorRequest {
673 request: request.clone(),
674 respond_to: result_sender,
675 };
676
677 self.sender
678 .send(device_request)
679 .await
680 .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?;
681
682 match result_receiver
683 .await
684 .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?
685 {
686 Ok(ans) => {
687 trace!("Handling DeviceManager request: {request:?}: Success");
688 Ok(ans)
689 }
690 Err(err) => {
691 error!(
692 "Handling DeviceManager request: {request:?}: Error ocurred on manager: {err:?}",
693 );
694 Err(err)
695 }
696 }
697 }
698 }
699 }
700}