1use std::future::Future;
22
23use astarte_interfaces::{MappingPath, interface::Retention, mapping::path::MappingPathError};
24use chrono::{DateTime, Utc};
25use tracing::{debug, error, info, trace, warn};
26
27use crate::{
28 Error,
29 event::DeviceEvent,
30 state::{ClientState, ConnStatus},
31 store::wrapper::StoreWrapper,
32 types::AstarteData,
33 validate::{ValidatedIndividual, ValidatedObject},
34};
35use crate::{
36 aggregate::AstarteObject,
37 error::{AggregationError, InterfaceTypeError},
38 logging::security::{SecurityEvent, notify_security_event},
39 retention::{
40 Id, RetentionId, StoredRetention, StoredRetentionExt,
41 memory::{ItemValue, VolatileItemError},
42 stored_mark_unsent, volatile_mark_unsent,
43 },
44 store::StoreCapabilities,
45 transport::{
46 Connection, Publish,
47 mqtt::{Mqtt, error::MqttError},
48 },
49};
50use crate::{error::DynError, transport::Disconnect};
51
52mod individual;
53mod introspection;
54mod object;
55mod property;
56
57#[non_exhaustive]
59#[derive(thiserror::Error, Debug)]
60pub enum RecvError {
61 #[error("connection error, {0:?}")]
65 Connection(#[source] DynError),
66 #[error("invalid mapping path")]
68 InvalidEndpoint(#[from] MappingPathError),
69 #[error("couldn't find interface '{name}'")]
71 InterfaceNotFound {
72 name: String,
74 },
75 #[error("couldn't find mapping {mapping} in interface {interface}")]
77 MappingNotFound {
78 interface: String,
80 mapping: String,
82 },
83 #[error("message received with missing explicit timestamp on {interface_name}{path}")]
85 MissingTimestamp {
86 interface_name: String,
88 path: String,
90 },
91 #[error("unset received on property {interface_name}{path} without allow_unset")]
93 Unset {
94 interface_name: String,
96 path: String,
98 },
99 #[error("invalid aggregation between interface and data")]
101 Aggregation(#[from] AggregationError),
102 #[error("invalid interface type between interface and data")]
104 InterfaceType(#[from] InterfaceTypeError),
105 #[error("received data on a device owned interface {interface}{path}")]
107 Ownership {
108 interface: String,
110 path: String,
112 },
113 #[error("disconnected from astarte")]
117 Disconnected,
118}
119
120impl RecvError {
122 pub(crate) fn mqtt_connection_error(value: MqttError) -> Self {
123 RecvError::Connection(value.into())
124 }
125
126 #[cfg(feature = "message-hub")]
127 pub(crate) fn grpc_connection_error(value: crate::transport::grpc::GrpcError) -> Self {
128 RecvError::Connection(value.into())
129 }
130}
131
132pub trait Client: Clone {
137 fn send_individual(
167 &mut self,
168 interface_name: &str,
169 mapping_path: &str,
170 data: AstarteData,
171 ) -> impl Future<Output = Result<(), Error>> + Send;
172
173 fn send_individual_with_timestamp(
204 &mut self,
205 interface_name: &str,
206 mapping_path: &str,
207 data: AstarteData,
208 timestamp: chrono::DateTime<chrono::Utc>,
209 ) -> impl Future<Output = Result<(), Error>> + Send;
210
211 fn send_object(
217 &mut self,
218 interface_name: &str,
219 base_path: &str,
220 data: AstarteObject,
221 ) -> impl Future<Output = Result<(), Error>> + Send;
222
223 fn send_object_with_timestamp(
267 &mut self,
268 interface_name: &str,
269 base_path: &str,
270 data: AstarteObject,
271 timestamp: chrono::DateTime<chrono::Utc>,
272 ) -> impl Future<Output = Result<(), Error>> + Send;
273
274 fn set_property(
304 &mut self,
305 interface_name: &str,
306 mapping_path: &str,
307 data: AstarteData,
308 ) -> impl Future<Output = Result<(), Error>> + Send;
309
310 fn unset_property(
339 &mut self,
340 interface_name: &str,
341 mapping_path: &str,
342 ) -> impl Future<Output = Result<(), Error>> + Send;
343
344 fn recv(&self) -> impl Future<Output = Result<DeviceEvent, RecvError>> + Send;
351}
352
353pub trait ClientDisconnect {
355 fn disconnect(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
357}
358
359#[derive(Debug)]
366pub struct DeviceClient<C>
367where
368 C: Connection,
369{
370 sender: C::Sender,
372 events: async_channel::Receiver<Result<DeviceEvent, RecvError>>,
376 pub(crate) disconnect: async_channel::Sender<()>,
377 pub(crate) store: StoreWrapper<C::Store>,
378 pub(crate) state: ClientState,
379}
380
381impl<C> DeviceClient<C>
382where
383 C: Connection,
384{
385 pub(crate) fn new(
386 sender: C::Sender,
387 rx: async_channel::Receiver<Result<DeviceEvent, RecvError>>,
388 store: StoreWrapper<C::Store>,
389 state: ClientState,
390 disconnect: async_channel::Sender<()>,
391 ) -> Self {
392 Self {
393 sender,
394 events: rx,
395 store,
396 state,
397 disconnect,
398 }
399 }
400
401 async fn send<T>(
402 state: &ClientState,
403 store: &StoreWrapper<C::Store>,
404 sender: &mut C::Sender,
405 data: T,
406 ) -> Result<(), Error>
407 where
408 T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError> + Clone,
409 C::Store: StoreCapabilities,
410 C::Sender: Publish,
411 {
412 match state.connection().await {
413 ConnStatus::Connected => {
414 trace!("publish while connection is connected");
415 }
416 ConnStatus::Disconnected => {
417 trace!("publish while connection is offline");
418 return Self::offline_send(state, store, sender, data).await;
419 }
420 ConnStatus::Closed => {
421 trace!("publish while connection is closed");
422
423 if let Err(error) = Self::offline_send(state, store, sender, data).await {
424 error!(%error, "couldn't store the send");
425 }
426
427 return Err(Error::Disconnected);
428 }
429 }
430
431 match data.get_retention() {
432 Retention::Volatile { .. } => Self::send_volatile(state, sender, data).await,
433 Retention::Stored { .. } => Self::send_stored(state, store, sender, data).await,
434 Retention::Discard => data.send(sender).await,
435 }
436 }
437
438 async fn offline_send<T>(
439 state: &ClientState,
440 store: &StoreWrapper<C::Store>,
441 sender: &mut C::Sender,
442 data: T,
443 ) -> Result<(), Error>
444 where
445 T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError>,
446 C::Store: StoreCapabilities,
447 C::Sender: Publish,
448 {
449 match data.get_retention() {
450 Retention::Discard => {
451 debug!("drop publish with retention discard since disconnected");
452 }
453 Retention::Volatile { .. } => {
454 let id = state.retention_ctx().next();
455
456 state.volatile_store().push_unsent(id, data).await;
457 }
458 Retention::Stored { .. } => {
459 let id = state.retention_ctx().next();
460
461 if let Some(retention) = store.get_retention() {
462 data.store_publish(&id, sender, retention, false).await?;
463 } else {
464 warn!(
465 ?store,
466 "storing interface with retention 'Stored' in volatile store since the store doesn't support retention"
467 );
468 state.volatile_store().push_unsent(id, data).await;
469 }
470 }
471 }
472
473 Ok(())
474 }
475
476 async fn send_stored<T>(
477 state: &ClientState,
478 store: &StoreWrapper<C::Store>,
479 sender: &mut C::Sender,
480 data: T,
481 ) -> Result<(), Error>
482 where
483 T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError> + Clone,
484 C::Store: StoreCapabilities,
485 C::Sender: Publish,
486 {
487 let Some(retention) = store.get_retention() else {
488 warn!(
489 ?store,
490 "storing interface with retention 'Stored' in volatile store since the store doesn't support retention"
491 );
492 return Self::send_volatile(state, sender, data).await;
493 };
494
495 let id = state.retention_ctx().next();
497
498 data.store_publish(&id, sender, retention, true).await?;
499
500 let result = data.send_stored(RetentionId::Stored(id), sender).await;
501
502 if result.is_err() {
503 error!("error while sending stored marking unsent");
504 stored_mark_unsent(store, &id).await;
505 }
506
507 result
508 }
509
510 async fn send_volatile<T>(
511 state: &ClientState,
512 sender: &mut C::Sender,
513 data: T,
514 ) -> Result<(), Error>
515 where
516 T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError> + Clone,
517 C::Store: StoreCapabilities,
518 C::Sender: Publish,
519 {
520 let id = state.retention_ctx().next();
521
522 state.volatile_store().push_sent(id, data.clone()).await;
523
524 let result = data.send_stored(RetentionId::Volatile(id), sender).await;
525
526 if result.is_err() {
527 error!("error while sending volatile marking unsent");
528 volatile_mark_unsent(state.volatile_store(), &id).await;
529 }
530
531 result
532 }
533}
534
535impl<S> DeviceClient<Mqtt<S>>
536where
537 S: StoreCapabilities,
538{
539 pub async fn get_cert_expiry(&self) -> Option<DateTime<Utc>> {
541 self.state.cert_expiry().await
542 }
543
544 pub async fn is_valid_at(&self, check_dt: DateTime<Utc>) -> Option<bool> {
548 let expiry = self.get_cert_expiry().await?;
549
550 if check_dt < expiry {
551 Some(true)
552 } else {
553 notify_security_event(SecurityEvent::CertificateAboutToExpire);
554
555 Some(false)
556 }
557 }
558}
559
560impl<C> Clone for DeviceClient<C>
562where
563 C: Connection,
564{
565 fn clone(&self) -> Self {
566 Self {
567 sender: self.sender.clone(),
568 events: self.events.clone(),
569 store: self.store.clone(),
570 state: self.state.clone(),
571 disconnect: self.disconnect.clone(),
572 }
573 }
574}
575
576impl<C> Client for DeviceClient<C>
577where
578 C: Connection,
579 C::Sender: Publish,
580{
581 async fn send_object_with_timestamp(
582 &mut self,
583 interface_name: &str,
584 base_path: &str,
585 data: AstarteObject,
586 timestamp: chrono::DateTime<chrono::Utc>,
587 ) -> Result<(), Error> {
588 let path = MappingPath::try_from(base_path)?;
589
590 self.send_datastream_object(interface_name, &path, data, Some(timestamp))
591 .await
592 }
593
594 async fn send_object(
595 &mut self,
596 interface_name: &str,
597 base_path: &str,
598 data: AstarteObject,
599 ) -> Result<(), Error> {
600 let path = MappingPath::try_from(base_path)?;
601
602 self.send_datastream_object(interface_name, &path, data, None)
603 .await
604 }
605
606 async fn send_individual(
607 &mut self,
608 interface_name: &str,
609 mapping_path: &str,
610 data: AstarteData,
611 ) -> Result<(), Error> {
612 let path = MappingPath::try_from(mapping_path)?;
613
614 self.send_datastream_individual(interface_name, &path, data, None)
615 .await
616 }
617
618 async fn send_individual_with_timestamp(
619 &mut self,
620 interface_name: &str,
621 mapping_path: &str,
622 data: AstarteData,
623 timestamp: chrono::DateTime<chrono::Utc>,
624 ) -> Result<(), Error> {
625 let mapping = MappingPath::try_from(mapping_path)?;
626
627 self.send_datastream_individual(interface_name, &mapping, data, Some(timestamp))
628 .await
629 }
630
631 async fn set_property(
632 &mut self,
633 interface_name: &str,
634 mapping_path: &str,
635 data: AstarteData,
636 ) -> Result<(), Error> {
637 trace!("setting property {}{}", interface_name, mapping_path);
638
639 let path = MappingPath::try_from(mapping_path)?;
640
641 self.send_property(interface_name, &path, data).await
642 }
643
644 async fn unset_property(
645 &mut self,
646 interface_name: &str,
647 mapping_path: &str,
648 ) -> Result<(), Error> {
649 trace!("unsetting {}{}", interface_name, mapping_path);
650
651 let path = MappingPath::try_from(mapping_path)?;
652
653 self.send_unset(interface_name, &path).await
654 }
655
656 async fn recv(&self) -> Result<DeviceEvent, RecvError> {
657 self.events
658 .recv()
659 .await
660 .map_err(|_| RecvError::Disconnected)?
661 }
662}
663
664impl<C> ClientDisconnect for DeviceClient<C>
665where
666 C: Connection,
667 C::Sender: Disconnect,
668{
669 async fn disconnect(&mut self) -> Result<(), Error> {
670 if self.state.connection().await == ConnStatus::Closed {
671 debug!("connection already closed");
672
673 return Ok(());
674 }
675
676 self.sender.disconnect().await?;
677
678 info!("device disconnected");
679
680 if let Err(error) = self.disconnect.try_send(()) {
681 error!(%error, "multiple clients trying to disconnect");
682 }
683
684 Ok(())
685 }
686}
687
688trait ClientPacket {
689 fn get_retention(&self) -> Retention;
690
691 fn serialize<S>(&self, sender: &S) -> Result<Vec<u8>, crate::Error>
692 where
693 S: Publish;
694
695 fn send<S>(self, sender: &mut S) -> impl Future<Output = Result<(), crate::Error>> + Send
696 where
697 S: Publish + Send;
698
699 fn send_stored<S>(
700 self,
701 id: RetentionId,
702 sender: &mut S,
703 ) -> impl Future<Output = Result<(), crate::Error>> + Send
704 where
705 S: Publish + Send;
706
707 fn store_publish<S, R>(
708 &self,
709 id: &Id,
710 sender: &S,
711 retention: &R,
712 sent: bool,
713 ) -> impl Future<Output = Result<(), crate::Error>> + Send
714 where
715 S: Publish + Sync,
716 R: StoredRetention + Sync;
717}
718
719impl ClientPacket for ValidatedIndividual {
720 fn get_retention(&self) -> Retention {
721 self.retention
722 }
723
724 fn serialize<S>(&self, sender: &S) -> Result<Vec<u8>, crate::Error>
725 where
726 S: Publish,
727 {
728 sender.serialize_individual(self)
729 }
730
731 async fn send<S>(self, sender: &mut S) -> Result<(), crate::Error>
732 where
733 S: Publish + Send,
734 {
735 sender.send_individual(self).await
736 }
737
738 async fn send_stored<S>(self, id: RetentionId, sender: &mut S) -> Result<(), crate::Error>
739 where
740 S: Publish,
741 {
742 sender.send_individual_stored(id, self).await
743 }
744
745 async fn store_publish<S, R>(
746 &self,
747 id: &Id,
748 sender: &S,
749 retention: &R,
750 sent: bool,
751 ) -> Result<(), crate::Error>
752 where
753 S: Publish + Sync,
754 R: StoredRetention + Sync,
755 {
756 let serialized = self.serialize(sender)?;
757
758 retention
759 .store_publish_individual(id, self, &serialized, sent)
760 .await
761 .map_err(crate::Error::from)
762 }
763}
764
765impl ClientPacket for ValidatedObject {
766 fn get_retention(&self) -> Retention {
767 self.retention
768 }
769
770 fn serialize<S>(&self, sender: &S) -> Result<Vec<u8>, crate::Error>
771 where
772 S: Publish,
773 {
774 sender.serialize_object(self)
775 }
776
777 async fn send<S>(self, sender: &mut S) -> Result<(), crate::Error>
778 where
779 S: Publish + Send,
780 {
781 sender.send_object(self).await
782 }
783
784 async fn send_stored<S>(self, id: RetentionId, sender: &mut S) -> Result<(), crate::Error>
785 where
786 S: Publish + Send,
787 {
788 sender.send_object_stored(id, self).await
789 }
790
791 async fn store_publish<S, R>(
792 &self,
793 id: &Id,
794 sender: &S,
795 retention: &R,
796 sent: bool,
797 ) -> Result<(), crate::Error>
798 where
799 S: Publish + Sync,
800 R: StoredRetention + Sync,
801 {
802 let serialized = self.serialize(sender)?;
803
804 retention
805 .store_publish_object(id, self, &serialized, sent)
806 .await
807 .map_err(crate::Error::from)
808 }
809}
810
811#[cfg(test)]
812pub(crate) mod tests {
813 use std::ops::{Deref, DerefMut};
814 use std::str::FromStr;
815 use std::sync::Arc;
816
817 use astarte_interfaces::Interface;
818 use chrono::Utc;
819 use mockall::Sequence;
820 use pretty_assertions::assert_eq;
821
822 use crate::Value;
823 use crate::builder::{Config, DEFAULT_CHANNEL_SIZE, DEFAULT_VOLATILE_CAPACITY};
824 use crate::interfaces::Interfaces;
825 use crate::retention::memory::VolatileStore;
826 use crate::state::SharedState;
827 use crate::store::StoreCapabilities;
828 use crate::store::memory::MemoryStore;
829 use crate::transport::mock::{MockCon, MockSender};
830
831 use super::*;
832
833 pub(crate) struct TestClient<S>
834 where
835 S: StoreCapabilities,
836 {
837 client: DeviceClient<MockCon<S>>,
838 pub(crate) disconnect: async_channel::Receiver<()>,
839 pub(crate) events: async_channel::Sender<Result<DeviceEvent, RecvError>>,
840 }
841
842 impl<S> Deref for TestClient<S>
843 where
844 S: StoreCapabilities,
845 {
846 type Target = DeviceClient<MockCon<S>>;
847
848 fn deref(&self) -> &Self::Target {
849 &self.client
850 }
851 }
852
853 impl<S> DerefMut for TestClient<S>
854 where
855 S: StoreCapabilities,
856 {
857 fn deref_mut(&mut self) -> &mut Self::Target {
858 &mut self.client
859 }
860 }
861
862 pub(crate) fn mock_client(
863 interfaces: &[&str],
864 initial_status: ConnStatus,
865 ) -> TestClient<MemoryStore> {
866 mock_client_with_store(interfaces, initial_status, MemoryStore::new())
867 }
868
869 pub(crate) fn mock_client_with_store<S>(
870 interfaces: &[&str],
871 initial_status: ConnStatus,
872 store: S,
873 ) -> TestClient<S>
874 where
875 S: StoreCapabilities,
876 {
877 let interfaces = interfaces.iter().map(|i| Interface::from_str(i).unwrap());
878 let interfaces = Interfaces::from_iter(interfaces);
879
880 let sender = MockSender::new();
881 let (events_tx, events_rx) = async_channel::bounded(DEFAULT_CHANNEL_SIZE.get());
882 let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
883
884 let mut state = SharedState::new(
885 Config::default(),
886 interfaces,
887 VolatileStore::with_capacity(DEFAULT_VOLATILE_CAPACITY.get()),
888 );
889
890 *state.status.get_mut() = initial_status;
891
892 let client = DeviceClient::new(
893 sender,
894 events_rx,
895 StoreWrapper::new(store),
896 ClientState::new(Arc::new(state)),
897 disconnect_tx,
898 );
899
900 TestClient {
901 client,
902 disconnect: disconnect_rx,
903 events: events_tx,
904 }
905 }
906
907 #[test]
908 fn client_must_be_clone() {
909 let mut client = mock_client(&[], ConnStatus::Connected);
910
911 let mut seq = Sequence::new();
912 client
913 .sender
914 .expect_clone()
915 .once()
916 .in_sequence(&mut seq)
917 .returning(MockSender::new);
918
919 let _b = client.clone();
920 }
921
922 #[tokio::test]
923 async fn client_recv() {
924 let client = mock_client(&[], ConnStatus::Connected);
925
926 let exp = DeviceEvent {
927 interface: "interface".to_string(),
928 path: "path".to_string(),
929 data: Value::Individual {
930 data: AstarteData::LongInteger(42),
931 timestamp: Utc::now(),
932 },
933 };
934
935 client.events.send(Ok(exp.clone())).await.unwrap();
936
937 let event = client.recv().await.unwrap();
938
939 assert_eq!(event, exp);
940 }
941
942 #[tokio::test]
943 async fn client_disconnect_closed() {
944 let mut client = mock_client(&[], ConnStatus::Disconnected);
945
946 let mut seq = Sequence::new();
947 client
948 .sender
949 .expect_disconnect()
950 .once()
951 .in_sequence(&mut seq)
952 .returning(|| Ok(()));
953
954 client.disconnect().await.unwrap();
955
956 client.disconnect.recv().await.unwrap();
957 }
958}