Skip to main content

astarte_device_sdk/client/
mod.rs

1// This file is part of Astarte.
2//
3// Copyright 2024-2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Client to send data to astarte, add interfaces or access properties.
20
21use std::future::Future;
22
23use astarte_interfaces::{MappingPath, interface::Retention, mapping::path::MappingPathError};
24use chrono::{DateTime, Utc};
25use tracing::{debug, error, info, trace, warn};
26
27use crate::{
28    Error,
29    event::DeviceEvent,
30    state::{ClientState, ConnStatus},
31    store::wrapper::StoreWrapper,
32    types::AstarteData,
33    validate::{ValidatedIndividual, ValidatedObject},
34};
35use crate::{
36    aggregate::AstarteObject,
37    error::{AggregationError, InterfaceTypeError},
38    logging::security::{SecurityEvent, notify_security_event},
39    retention::{
40        Id, RetentionId, StoredRetention, StoredRetentionExt,
41        memory::{ItemValue, VolatileItemError},
42        stored_mark_unsent, volatile_mark_unsent,
43    },
44    store::StoreCapabilities,
45    transport::{
46        Connection, Publish,
47        mqtt::{Mqtt, error::MqttError},
48    },
49};
50use crate::{error::DynError, transport::Disconnect};
51
52mod individual;
53mod introspection;
54mod object;
55mod property;
56
57/// Error generated by or received from the connection.
58#[non_exhaustive]
59#[derive(thiserror::Error, Debug)]
60pub enum RecvError {
61    /// A generic connection related error.
62    ///
63    /// Should be downcasted to access the underling specific connection error.
64    #[error("connection error, {0:?}")]
65    Connection(#[source] DynError),
66    /// Couldn't parse the mapping path.
67    #[error("invalid mapping path")]
68    InvalidEndpoint(#[from] MappingPathError),
69    /// Couldn't find an interface with the given name.
70    #[error("couldn't find interface '{name}'")]
71    InterfaceNotFound {
72        /// Name of the missing interface.
73        name: String,
74    },
75    /// Couldn't find missing mapping in the interface.
76    #[error("couldn't find mapping {mapping} in interface {interface}")]
77    MappingNotFound {
78        /// Name of the interface.
79        interface: String,
80        /// Path of the missing mapping.
81        mapping: String,
82    },
83    /// Received a message without timestamp, on an interface with `explicit_timestamp`
84    #[error("message received with missing explicit timestamp on {interface_name}{path}")]
85    MissingTimestamp {
86        /// The interface we received the data on.
87        interface_name: String,
88        /// The mapping path we received on.
89        path: String,
90    },
91    /// Received unset on property without `allow_unset`
92    #[error("unset received on property {interface_name}{path} without allow_unset")]
93    Unset {
94        /// The interface we received the data on.
95        interface_name: String,
96        /// The mapping path we received on.
97        path: String,
98    },
99    /// Invalid aggregation between the interface and the data.
100    #[error("invalid aggregation between interface and data")]
101    Aggregation(#[from] AggregationError),
102    /// Invalid aggregation between the interface and the data.
103    #[error("invalid interface type between interface and data")]
104    InterfaceType(#[from] InterfaceTypeError),
105    /// Received data on a device owned interface.
106    #[error("received data on a device owned interface {interface}{path}")]
107    Ownership {
108        /// The interface we received on.
109        interface: String,
110        /// The endpoint we received on.
111        path: String,
112    },
113    /// Error when the Device is disconnected from Astarte or client.
114    ///
115    /// This is an unrecoverable error for the SDK.
116    #[error("disconnected from astarte")]
117    Disconnected,
118}
119
120// Safe conversion for connection error
121impl RecvError {
122    pub(crate) fn mqtt_connection_error(value: MqttError) -> Self {
123        RecvError::Connection(value.into())
124    }
125
126    #[cfg(feature = "message-hub")]
127    pub(crate) fn grpc_connection_error(value: crate::transport::grpc::GrpcError) -> Self {
128        RecvError::Connection(value.into())
129    }
130}
131
132/// A trait representing the behavior of an Astarte device client.
133///
134/// A device client is responsible for interacting with the Astarte platform by sending properties
135/// and datastreams, handling events, and managing device interfaces.
136pub trait Client: Clone {
137    /// Send an individual datastream on an interface.
138    ///
139    /// ```no_run
140    /// use astarte_device_sdk::builder::DeviceBuilder;
141    /// use astarte_device_sdk::prelude::*;
142    /// use astarte_device_sdk::store::memory::MemoryStore;
143    /// use astarte_device_sdk::transport::mqtt::{MqttConfig, MqttArgs, Credential};
144    /// use astarte_device_sdk::types::AstarteData;
145    /// use chrono::{TimeZone, Utc};
146    ///
147    /// #[tokio::main]
148    /// async fn main() {
149    ///     let args = MqttArgs{
150    ///         realm: "realm_id".to_string(),
151    ///         device_id: "device_id".to_string(),
152    ///         credential: Credential::secret("credential_secret"),
153    ///         pairing_url: "http://api.astarte.localhost/pairing".parse().expect("a valid URL")
154    ///     };
155    ///     let mqtt_config = MqttConfig::new(args);
156    ///
157    ///     let (mut client, connection) = DeviceBuilder::new().store(MemoryStore::new())
158    ///         .connection(mqtt_config).build().await.unwrap();
159    ///
160    ///     let value: i32 = 42;
161    ///     client.send_individual("my.interface.name", "/endpoint/path", value.into())
162    ///         .await
163    ///         .unwrap();
164    /// }
165    /// ```
166    fn send_individual(
167        &mut self,
168        interface_name: &str,
169        mapping_path: &str,
170        data: AstarteData,
171    ) -> impl Future<Output = Result<(), Error>> + Send;
172
173    /// Send an individual datastream on an interface, with an explicit timestamp.
174    ///
175    /// ```no_run
176    /// use astarte_device_sdk::builder::DeviceBuilder;
177    /// use astarte_device_sdk::prelude::*;
178    /// use astarte_device_sdk::store::memory::MemoryStore;
179    /// use astarte_device_sdk::transport::mqtt::{MqttConfig, MqttArgs, Credential};
180    /// use astarte_device_sdk::types::AstarteData;
181    /// use chrono::{TimeZone, Utc};
182    ///
183    /// #[tokio::main]
184    /// async fn main() {
185    ///     let args = MqttArgs{
186    ///         realm: "realm_id".to_string(),
187    ///         device_id: "device_id".to_string(),
188    ///         credential: Credential::secret("credential_secret"),
189    ///         pairing_url: "http://api.astarte.localhost/pairing".parse().expect("a valid URL")
190    ///     };
191    ///     let mqtt_config = MqttConfig::new(args);
192    ///
193    ///     let (mut client, connection) = DeviceBuilder::new().store(MemoryStore::new())
194    ///         .connection(mqtt_config).build().await.unwrap();
195    ///
196    ///     let value: i32 = 42;
197    ///     let timestamp = Utc.timestamp_opt(1537449422, 0).unwrap();
198    ///     client.send_individual_with_timestamp("my.interface.name", "/endpoint/path", value.into(), timestamp)
199    ///         .await
200    ///         .unwrap();
201    /// }
202    /// ```
203    fn send_individual_with_timestamp(
204        &mut self,
205        interface_name: &str,
206        mapping_path: &str,
207        data: AstarteData,
208        timestamp: chrono::DateTime<chrono::Utc>,
209    ) -> impl Future<Output = Result<(), Error>> + Send;
210
211    /// Send an object datastream on an interface.
212    ///
213    /// The usage is the same of
214    /// [`send_object_with_timestamp`](crate::Client::send_object_with_timestamp),
215    /// without the timestamp.
216    fn send_object(
217        &mut self,
218        interface_name: &str,
219        base_path: &str,
220        data: AstarteObject,
221    ) -> impl Future<Output = Result<(), Error>> + Send;
222
223    /// Send an object datastream on an interface, with an explicit timestamp.
224    ///
225    /// ```no_run
226    /// use astarte_device_sdk::store::memory::MemoryStore;
227    /// use astarte_device_sdk::builder::DeviceBuilder;
228    /// use astarte_device_sdk::transport::mqtt::{MqttConfig, Credential, MqttArgs};
229    /// use astarte_device_sdk::types::AstarteData;
230    /// use astarte_device_sdk::prelude::*;
231    /// # #[cfg(feature = "derive")]
232    /// use astarte_device_sdk::IntoAstarteObject;
233    /// # #[cfg(not(feature = "derive"))]
234    /// # use astarte_device_sdk_derive::IntoAstarteObject;
235    /// use chrono::{TimeZone, Utc};
236    ///
237    /// #[derive(IntoAstarteObject)]
238    /// struct TestObject {
239    ///     endpoint1: f64,
240    ///     endpoint2: bool,
241    /// }
242    ///
243    /// #[tokio::main]
244    /// async fn main() {
245    ///     let args = MqttArgs {
246    ///         realm: "realm_id".to_string(),
247    ///         device_id: "device_id".to_string(),
248    ///         credential: Credential::secret("credential_secret"),
249    ///         pairing_url: "http://api.astarte.localhost/pairing".parse().expect("a valid URL")
250    ///     };
251    ///     let mqtt_config = MqttConfig::new(args);
252    ///
253    ///     let (mut client, connection) = DeviceBuilder::new().store(MemoryStore::new())
254    ///         .connection(mqtt_config).build().await.unwrap();
255    ///
256    ///     let data = TestObject {
257    ///         endpoint1: 1.34,
258    ///         endpoint2: false
259    ///     };
260    ///     let timestamp = Utc.timestamp_opt(1537449422, 0).unwrap();
261    ///     client.send_object_with_timestamp("my.interface.name", "/endpoint/path", data.try_into().unwrap(), timestamp)
262    ///         .await
263    ///         .unwrap();
264    /// }
265    /// ```
266    fn send_object_with_timestamp(
267        &mut self,
268        interface_name: &str,
269        base_path: &str,
270        data: AstarteObject,
271        timestamp: chrono::DateTime<chrono::Utc>,
272    ) -> impl Future<Output = Result<(), Error>> + Send;
273
274    /// Send an individual datastream on an interface.
275    ///
276    /// ```no_run
277    /// use astarte_device_sdk::builder::DeviceBuilder;
278    /// use astarte_device_sdk::prelude::*;
279    /// use astarte_device_sdk::store::memory::MemoryStore;
280    /// use astarte_device_sdk::transport::mqtt::{MqttConfig, MqttArgs, Credential};
281    /// use astarte_device_sdk::types::AstarteData;
282    /// use chrono::{TimeZone, Utc};
283    ///
284    /// #[tokio::main]
285    /// async fn main() {
286    ///     let args = MqttArgs{
287    ///         realm: "realm_id".to_string(),
288    ///         device_id: "device_id".to_string(),
289    ///         credential: Credential::secret("credential_secret"),
290    ///         pairing_url: "http://api.astarte.localhost/pairing".parse().expect("a valid URL")
291    ///     };
292    ///     let mqtt_config = MqttConfig::new(args);
293    ///
294    ///     let (mut client, connection) = DeviceBuilder::new().store(MemoryStore::new())
295    ///         .connection(mqtt_config).build().await.unwrap();
296    ///
297    ///     let value: i32 = 42;
298    ///     client.set_property("my.interface.name", "/endpoint/path", value.into())
299    ///         .await
300    ///         .unwrap();
301    /// }
302    /// ```
303    fn set_property(
304        &mut self,
305        interface_name: &str,
306        mapping_path: &str,
307        data: AstarteData,
308    ) -> impl Future<Output = Result<(), Error>> + Send;
309
310    /// Unset a device property.
311    ///
312    /// ```no_run
313    /// use astarte_device_sdk::builder::DeviceBuilder;
314    /// use astarte_device_sdk::prelude::*;
315    /// use astarte_device_sdk::store::memory::MemoryStore;
316    /// use astarte_device_sdk::transport::mqtt::{MqttConfig, MqttArgs, Credential};
317    /// use astarte_device_sdk::types::AstarteData;
318    ///
319    /// #[tokio::main]
320    /// async fn main() {
321    ///     let args = MqttArgs {
322    ///         realm: "realm_id".to_string(),
323    ///         device_id: "device_id".to_string(),
324    ///         credential: Credential::secret("credential_secret"),
325    ///         pairing_url: "http://api.astarte.localhost/pairing".parse().expect("a valid URL")
326    ///     };
327    ///     let mqtt_config = MqttConfig::new(args);
328    ///
329    ///     let (mut device, _connection) = DeviceBuilder::new().store(MemoryStore::new())
330    ///         .connection(mqtt_config).build().await.unwrap();
331    ///
332    ///     device
333    ///         .unset_property("my.interface.name", "/endpoint/path",)
334    ///         .await
335    ///         .unwrap();
336    /// }
337    /// ```
338    fn unset_property(
339        &mut self,
340        interface_name: &str,
341        mapping_path: &str,
342    ) -> impl Future<Output = Result<(), Error>> + Send;
343
344    /// Receives an event from Astarte.
345    ///
346    /// # Note
347    ///
348    /// An event can only be received once, so if the client is cloned only one of the clients
349    /// instances will receive the message.
350    fn recv(&self) -> impl Future<Output = Result<DeviceEvent, RecvError>> + Send;
351}
352
353/// A trait representing the behavior of an Astarte device client to disconnect itself from Astarte.
354pub trait ClientDisconnect {
355    /// Cleanly disconnects the client consuming it.
356    fn disconnect(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
357}
358
359/// Client to send and receive message to and form Astarte or access the Device properties.
360///
361/// ### Notes
362///
363/// Cloning the client will not broadcast the [`DeviceEvent`]. Each message can
364/// only be received once.
365#[derive(Debug)]
366pub struct DeviceClient<C>
367where
368    C: Connection,
369{
370    // Sender of the connection.
371    sender: C::Sender,
372    // We use multi producer multi consumer instead of the mpsc channel for the DeviceEvents for the connection to che
373    // client since we need the Receiver end to be cloneable.
374    // The tokio Broadcast channel provides an async mpmc, but suffer from the "slow receiver" problem.
375    events: async_channel::Receiver<Result<DeviceEvent, RecvError>>,
376    pub(crate) disconnect: async_channel::Sender<()>,
377    pub(crate) store: StoreWrapper<C::Store>,
378    pub(crate) state: ClientState,
379}
380
381impl<C> DeviceClient<C>
382where
383    C: Connection,
384{
385    pub(crate) fn new(
386        sender: C::Sender,
387        rx: async_channel::Receiver<Result<DeviceEvent, RecvError>>,
388        store: StoreWrapper<C::Store>,
389        state: ClientState,
390        disconnect: async_channel::Sender<()>,
391    ) -> Self {
392        Self {
393            sender,
394            events: rx,
395            store,
396            state,
397            disconnect,
398        }
399    }
400
401    async fn send<T>(
402        state: &ClientState,
403        store: &StoreWrapper<C::Store>,
404        sender: &mut C::Sender,
405        data: T,
406    ) -> Result<(), Error>
407    where
408        T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError> + Clone,
409        C::Store: StoreCapabilities,
410        C::Sender: Publish,
411    {
412        match state.connection().await {
413            ConnStatus::Connected => {
414                trace!("publish while connection is connected");
415            }
416            ConnStatus::Disconnected => {
417                trace!("publish while connection is offline");
418                return Self::offline_send(state, store, sender, data).await;
419            }
420            ConnStatus::Closed => {
421                trace!("publish while connection is closed");
422
423                if let Err(error) = Self::offline_send(state, store, sender, data).await {
424                    error!(%error, "couldn't store the send");
425                }
426
427                return Err(Error::Disconnected);
428            }
429        }
430
431        match data.get_retention() {
432            Retention::Volatile { .. } => Self::send_volatile(state, sender, data).await,
433            Retention::Stored { .. } => Self::send_stored(state, store, sender, data).await,
434            Retention::Discard => data.send(sender).await,
435        }
436    }
437
438    async fn offline_send<T>(
439        state: &ClientState,
440        store: &StoreWrapper<C::Store>,
441        sender: &mut C::Sender,
442        data: T,
443    ) -> Result<(), Error>
444    where
445        T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError>,
446        C::Store: StoreCapabilities,
447        C::Sender: Publish,
448    {
449        match data.get_retention() {
450            Retention::Discard => {
451                debug!("drop publish with retention discard since disconnected");
452            }
453            Retention::Volatile { .. } => {
454                let id = state.retention_ctx().next();
455
456                state.volatile_store().push_unsent(id, data).await;
457            }
458            Retention::Stored { .. } => {
459                let id = state.retention_ctx().next();
460
461                if let Some(retention) = store.get_retention() {
462                    data.store_publish(&id, sender, retention, false).await?;
463                } else {
464                    warn!(
465                        ?store,
466                        "storing interface with retention 'Stored' in volatile store since the store doesn't support retention"
467                    );
468                    state.volatile_store().push_unsent(id, data).await;
469                }
470            }
471        }
472
473        Ok(())
474    }
475
476    async fn send_stored<T>(
477        state: &ClientState,
478        store: &StoreWrapper<C::Store>,
479        sender: &mut C::Sender,
480        data: T,
481    ) -> Result<(), Error>
482    where
483        T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError> + Clone,
484        C::Store: StoreCapabilities,
485        C::Sender: Publish,
486    {
487        let Some(retention) = store.get_retention() else {
488            warn!(
489                ?store,
490                "storing interface with retention 'Stored' in volatile store since the store doesn't support retention"
491            );
492            return Self::send_volatile(state, sender, data).await;
493        };
494
495        // generate id after the check to avoid wasting an id generation in case it gets regenerated in send_volatile
496        let id = state.retention_ctx().next();
497
498        data.store_publish(&id, sender, retention, true).await?;
499
500        let result = data.send_stored(RetentionId::Stored(id), sender).await;
501
502        if result.is_err() {
503            error!("error while sending stored marking unsent");
504            stored_mark_unsent(store, &id).await;
505        }
506
507        result
508    }
509
510    async fn send_volatile<T>(
511        state: &ClientState,
512        sender: &mut C::Sender,
513        data: T,
514    ) -> Result<(), Error>
515    where
516        T: ClientPacket + TryInto<ItemValue, Error = VolatileItemError> + Clone,
517        C::Store: StoreCapabilities,
518        C::Sender: Publish,
519    {
520        let id = state.retention_ctx().next();
521
522        state.volatile_store().push_sent(id, data.clone()).await;
523
524        let result = data.send_stored(RetentionId::Volatile(id), sender).await;
525
526        if result.is_err() {
527            error!("error while sending volatile marking unsent");
528            volatile_mark_unsent(state.volatile_store(), &id).await;
529        }
530
531        result
532    }
533}
534
535impl<S> DeviceClient<Mqtt<S>>
536where
537    S: StoreCapabilities,
538{
539    /// Retrieve the expiry (not_after) timestamp of the current certificate
540    pub async fn get_cert_expiry(&self) -> Option<DateTime<Utc>> {
541        self.state.cert_expiry().await
542    }
543
544    /// Retrieve the expiry (not_after) timestamp of the current certificate
545    /// Note that this function will log a security event if the feature is enabled
546    /// when the certificate will expire at the passed datetime
547    pub async fn is_valid_at(&self, check_dt: DateTime<Utc>) -> Option<bool> {
548        let expiry = self.get_cert_expiry().await?;
549
550        if check_dt < expiry {
551            Some(true)
552        } else {
553            notify_security_event(SecurityEvent::CertificateAboutToExpire);
554
555            Some(false)
556        }
557    }
558}
559
560// Cannot be derived it has specific generic bounds.
561impl<C> Clone for DeviceClient<C>
562where
563    C: Connection,
564{
565    fn clone(&self) -> Self {
566        Self {
567            sender: self.sender.clone(),
568            events: self.events.clone(),
569            store: self.store.clone(),
570            state: self.state.clone(),
571            disconnect: self.disconnect.clone(),
572        }
573    }
574}
575
576impl<C> Client for DeviceClient<C>
577where
578    C: Connection,
579    C::Sender: Publish,
580{
581    async fn send_object_with_timestamp(
582        &mut self,
583        interface_name: &str,
584        base_path: &str,
585        data: AstarteObject,
586        timestamp: chrono::DateTime<chrono::Utc>,
587    ) -> Result<(), Error> {
588        let path = MappingPath::try_from(base_path)?;
589
590        self.send_datastream_object(interface_name, &path, data, Some(timestamp))
591            .await
592    }
593
594    async fn send_object(
595        &mut self,
596        interface_name: &str,
597        base_path: &str,
598        data: AstarteObject,
599    ) -> Result<(), Error> {
600        let path = MappingPath::try_from(base_path)?;
601
602        self.send_datastream_object(interface_name, &path, data, None)
603            .await
604    }
605
606    async fn send_individual(
607        &mut self,
608        interface_name: &str,
609        mapping_path: &str,
610        data: AstarteData,
611    ) -> Result<(), Error> {
612        let path = MappingPath::try_from(mapping_path)?;
613
614        self.send_datastream_individual(interface_name, &path, data, None)
615            .await
616    }
617
618    async fn send_individual_with_timestamp(
619        &mut self,
620        interface_name: &str,
621        mapping_path: &str,
622        data: AstarteData,
623        timestamp: chrono::DateTime<chrono::Utc>,
624    ) -> Result<(), Error> {
625        let mapping = MappingPath::try_from(mapping_path)?;
626
627        self.send_datastream_individual(interface_name, &mapping, data, Some(timestamp))
628            .await
629    }
630
631    async fn set_property(
632        &mut self,
633        interface_name: &str,
634        mapping_path: &str,
635        data: AstarteData,
636    ) -> Result<(), Error> {
637        trace!("setting property {}{}", interface_name, mapping_path);
638
639        let path = MappingPath::try_from(mapping_path)?;
640
641        self.send_property(interface_name, &path, data).await
642    }
643
644    async fn unset_property(
645        &mut self,
646        interface_name: &str,
647        mapping_path: &str,
648    ) -> Result<(), Error> {
649        trace!("unsetting {}{}", interface_name, mapping_path);
650
651        let path = MappingPath::try_from(mapping_path)?;
652
653        self.send_unset(interface_name, &path).await
654    }
655
656    async fn recv(&self) -> Result<DeviceEvent, RecvError> {
657        self.events
658            .recv()
659            .await
660            .map_err(|_| RecvError::Disconnected)?
661    }
662}
663
664impl<C> ClientDisconnect for DeviceClient<C>
665where
666    C: Connection,
667    C::Sender: Disconnect,
668{
669    async fn disconnect(&mut self) -> Result<(), Error> {
670        if self.state.connection().await == ConnStatus::Closed {
671            debug!("connection already closed");
672
673            return Ok(());
674        }
675
676        self.sender.disconnect().await?;
677
678        info!("device disconnected");
679
680        if let Err(error) = self.disconnect.try_send(()) {
681            error!(%error, "multiple clients trying to disconnect");
682        }
683
684        Ok(())
685    }
686}
687
688trait ClientPacket {
689    fn get_retention(&self) -> Retention;
690
691    fn serialize<S>(&self, sender: &S) -> Result<Vec<u8>, crate::Error>
692    where
693        S: Publish;
694
695    fn send<S>(self, sender: &mut S) -> impl Future<Output = Result<(), crate::Error>> + Send
696    where
697        S: Publish + Send;
698
699    fn send_stored<S>(
700        self,
701        id: RetentionId,
702        sender: &mut S,
703    ) -> impl Future<Output = Result<(), crate::Error>> + Send
704    where
705        S: Publish + Send;
706
707    fn store_publish<S, R>(
708        &self,
709        id: &Id,
710        sender: &S,
711        retention: &R,
712        sent: bool,
713    ) -> impl Future<Output = Result<(), crate::Error>> + Send
714    where
715        S: Publish + Sync,
716        R: StoredRetention + Sync;
717}
718
719impl ClientPacket for ValidatedIndividual {
720    fn get_retention(&self) -> Retention {
721        self.retention
722    }
723
724    fn serialize<S>(&self, sender: &S) -> Result<Vec<u8>, crate::Error>
725    where
726        S: Publish,
727    {
728        sender.serialize_individual(self)
729    }
730
731    async fn send<S>(self, sender: &mut S) -> Result<(), crate::Error>
732    where
733        S: Publish + Send,
734    {
735        sender.send_individual(self).await
736    }
737
738    async fn send_stored<S>(self, id: RetentionId, sender: &mut S) -> Result<(), crate::Error>
739    where
740        S: Publish,
741    {
742        sender.send_individual_stored(id, self).await
743    }
744
745    async fn store_publish<S, R>(
746        &self,
747        id: &Id,
748        sender: &S,
749        retention: &R,
750        sent: bool,
751    ) -> Result<(), crate::Error>
752    where
753        S: Publish + Sync,
754        R: StoredRetention + Sync,
755    {
756        let serialized = self.serialize(sender)?;
757
758        retention
759            .store_publish_individual(id, self, &serialized, sent)
760            .await
761            .map_err(crate::Error::from)
762    }
763}
764
765impl ClientPacket for ValidatedObject {
766    fn get_retention(&self) -> Retention {
767        self.retention
768    }
769
770    fn serialize<S>(&self, sender: &S) -> Result<Vec<u8>, crate::Error>
771    where
772        S: Publish,
773    {
774        sender.serialize_object(self)
775    }
776
777    async fn send<S>(self, sender: &mut S) -> Result<(), crate::Error>
778    where
779        S: Publish + Send,
780    {
781        sender.send_object(self).await
782    }
783
784    async fn send_stored<S>(self, id: RetentionId, sender: &mut S) -> Result<(), crate::Error>
785    where
786        S: Publish + Send,
787    {
788        sender.send_object_stored(id, self).await
789    }
790
791    async fn store_publish<S, R>(
792        &self,
793        id: &Id,
794        sender: &S,
795        retention: &R,
796        sent: bool,
797    ) -> Result<(), crate::Error>
798    where
799        S: Publish + Sync,
800        R: StoredRetention + Sync,
801    {
802        let serialized = self.serialize(sender)?;
803
804        retention
805            .store_publish_object(id, self, &serialized, sent)
806            .await
807            .map_err(crate::Error::from)
808    }
809}
810
811#[cfg(test)]
812pub(crate) mod tests {
813    use std::ops::{Deref, DerefMut};
814    use std::str::FromStr;
815    use std::sync::Arc;
816
817    use astarte_interfaces::Interface;
818    use chrono::Utc;
819    use mockall::Sequence;
820    use pretty_assertions::assert_eq;
821
822    use crate::Value;
823    use crate::builder::{Config, DEFAULT_CHANNEL_SIZE, DEFAULT_VOLATILE_CAPACITY};
824    use crate::interfaces::Interfaces;
825    use crate::retention::memory::VolatileStore;
826    use crate::state::SharedState;
827    use crate::store::StoreCapabilities;
828    use crate::store::memory::MemoryStore;
829    use crate::transport::mock::{MockCon, MockSender};
830
831    use super::*;
832
833    pub(crate) struct TestClient<S>
834    where
835        S: StoreCapabilities,
836    {
837        client: DeviceClient<MockCon<S>>,
838        pub(crate) disconnect: async_channel::Receiver<()>,
839        pub(crate) events: async_channel::Sender<Result<DeviceEvent, RecvError>>,
840    }
841
842    impl<S> Deref for TestClient<S>
843    where
844        S: StoreCapabilities,
845    {
846        type Target = DeviceClient<MockCon<S>>;
847
848        fn deref(&self) -> &Self::Target {
849            &self.client
850        }
851    }
852
853    impl<S> DerefMut for TestClient<S>
854    where
855        S: StoreCapabilities,
856    {
857        fn deref_mut(&mut self) -> &mut Self::Target {
858            &mut self.client
859        }
860    }
861
862    pub(crate) fn mock_client(
863        interfaces: &[&str],
864        initial_status: ConnStatus,
865    ) -> TestClient<MemoryStore> {
866        mock_client_with_store(interfaces, initial_status, MemoryStore::new())
867    }
868
869    pub(crate) fn mock_client_with_store<S>(
870        interfaces: &[&str],
871        initial_status: ConnStatus,
872        store: S,
873    ) -> TestClient<S>
874    where
875        S: StoreCapabilities,
876    {
877        let interfaces = interfaces.iter().map(|i| Interface::from_str(i).unwrap());
878        let interfaces = Interfaces::from_iter(interfaces);
879
880        let sender = MockSender::new();
881        let (events_tx, events_rx) = async_channel::bounded(DEFAULT_CHANNEL_SIZE.get());
882        let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
883
884        let mut state = SharedState::new(
885            Config::default(),
886            interfaces,
887            VolatileStore::with_capacity(DEFAULT_VOLATILE_CAPACITY.get()),
888        );
889
890        *state.status.get_mut() = initial_status;
891
892        let client = DeviceClient::new(
893            sender,
894            events_rx,
895            StoreWrapper::new(store),
896            ClientState::new(Arc::new(state)),
897            disconnect_tx,
898        );
899
900        TestClient {
901            client,
902            disconnect: disconnect_rx,
903            events: events_tx,
904        }
905    }
906
907    #[test]
908    fn client_must_be_clone() {
909        let mut client = mock_client(&[], ConnStatus::Connected);
910
911        let mut seq = Sequence::new();
912        client
913            .sender
914            .expect_clone()
915            .once()
916            .in_sequence(&mut seq)
917            .returning(MockSender::new);
918
919        let _b = client.clone();
920    }
921
922    #[tokio::test]
923    async fn client_recv() {
924        let client = mock_client(&[], ConnStatus::Connected);
925
926        let exp = DeviceEvent {
927            interface: "interface".to_string(),
928            path: "path".to_string(),
929            data: Value::Individual {
930                data: AstarteData::LongInteger(42),
931                timestamp: Utc::now(),
932            },
933        };
934
935        client.events.send(Ok(exp.clone())).await.unwrap();
936
937        let event = client.recv().await.unwrap();
938
939        assert_eq!(event, exp);
940    }
941
942    #[tokio::test]
943    async fn client_disconnect_closed() {
944        let mut client = mock_client(&[], ConnStatus::Disconnected);
945
946        let mut seq = Sequence::new();
947        client
948            .sender
949            .expect_disconnect()
950            .once()
951            .in_sequence(&mut seq)
952            .returning(|| Ok(()));
953
954        client.disconnect().await.unwrap();
955
956        client.disconnect.recv().await.unwrap();
957    }
958}