Skip to main content

astarte_device_sdk/
builder.rs

1// This file is part of Astarte.
2//
3// Copyright 2021-2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Provides functionality to configure an instance of the [`DeviceClient`] and
20//! [`DeviceConnection`].
21
22use std::ffi::OsStr;
23use std::fmt::Debug;
24use std::fs;
25use std::future::Future;
26use std::io;
27use std::num::NonZero;
28use std::path::Path;
29use std::path::PathBuf;
30use std::str::FromStr;
31use std::sync::Arc;
32use std::time::Duration;
33
34use astarte_interfaces::Interface;
35use tracing::debug;
36
37use crate::Error;
38use crate::client::DeviceClient;
39use crate::connection::DeviceConnection;
40use crate::interfaces::Interfaces;
41use crate::introspection::AddInterfaceError;
42use crate::retention::RetentionError;
43use crate::retention::StoredRetention;
44use crate::retention::memory::VolatileStore;
45use crate::retry::ExponentialIter;
46use crate::retry::RandomExponentialIter;
47use crate::state::SharedState;
48use crate::store::PropertyStore;
49use crate::store::StoreCapabilities;
50use crate::store::sqlite::SqliteError;
51use crate::store::wrapper::StoreWrapper;
52use crate::transport::Connection;
53
54/// Default capacity of the channels
55///
56/// This constant is the default bounded channel size for *both* the rumqttc AsyncClient and
57/// EventLoop and the internal channel used by the [`DeviceClient`] and [`DeviceConnection`] to send
58/// events data to the external receiver and between each component.
59pub const DEFAULT_CHANNEL_SIZE: NonZero<usize> = NonZero::new(50).unwrap();
60
61/// Default capacity for the number of packets with retention volatile to store in memory.
62pub const DEFAULT_VOLATILE_CAPACITY: NonZero<usize> = NonZero::new(1000).unwrap();
63
64/// Default capacity for the number of packets w ith retention store to store in memory.
65pub const DEFAULT_STORE_CAPACITY: NonZero<usize> = NonZero::new(1_000_000).unwrap();
66
67/// Default connection timeout.
68///
69/// This is the timeout for establishing a connection for the transport.
70pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
71
72/// Default send and receive timeout.
73///
74/// This is not the complete timeout of the whole connection process, it's a timeout applied per
75/// request.
76pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
77
78/// Default receiver warning threshold.
79pub const DEFAULT_SLOW_RECEIVE_THRESHOLDS: Duration = Duration::from_secs(10);
80
81/// Default maximum delay for the backoff.
82pub const DEFAULT_BACKOFF_MAXIMUM_DELAY: Duration = Duration::from_secs(256);
83
84/// Default reset interval for the backoff.
85pub const DEFAULT_BACKOFF_RESET_INTERVAL: Duration = Duration::from_secs(256 * 4);
86
87/// Default random jitter percentage to add/subtract to the delay.
88pub const DEFAULT_BACKOFF_JITTER_PERCENTAGE: u8 = 50;
89
90/// Astarte builder error.
91///
92/// Possible errors used by the Astarte builder module.
93#[non_exhaustive]
94#[derive(thiserror::Error, Debug)]
95pub enum BuilderError {
96    /// Failed to read interface directory
97    #[error("couldn't read interface path {}", .path.display())]
98    Io {
99        /// Path to the interface file.
100        path: PathBuf,
101        /// Reason why the file couldn't be read.
102        #[source]
103        backtrace: io::Error,
104    },
105    /// Couldn't get the metadata of the writable directory
106    #[error("couldn't get metadata for {}", .path.display())]
107    DirectoryMetadata {
108        /// Path to the interface directory.
109        path: PathBuf,
110        /// Reason why the directory or file couldn't be read.
111        #[source]
112        backtrace: io::Error,
113    },
114    /// Provided path is not a directory
115    #[error("invalid directory at {}", .0.display())]
116    NotADirectory(PathBuf),
117    /// The provided directory is read only
118    #[error("directory is read only {}", .0.display())]
119    DirectoryReadonly(PathBuf),
120    /// Couldn't connect to the SQLite store
121    #[error("couldn't connect to the SQLite store")]
122    Sqlite(#[from] SqliteError),
123    /// Couldn't set the maximum number of items in the store
124    #[error("couldn't set the maximum number of items in the store")]
125    Retention(#[from] RetentionError),
126}
127
128/// Configuration options and parameters for the connection.
129#[derive(Debug, Clone)]
130#[non_exhaustive]
131pub struct Config {
132    /// Optional writable directory
133    pub writable_dir: Option<PathBuf>,
134    /// Channel size
135    pub channel_size: NonZero<usize>,
136    /// Connection timeout
137    pub connection_timeout: Duration,
138    /// Send or write timeout
139    pub send_timeout: Duration,
140    /// Recv or read timeout
141    pub recv_timeout: Duration,
142    /// Slow receiver threshold for warning
143    pub slow_receive: Duration,
144    /// Maximum period the backoff will wait for.
145    pub exponential_backoff_max: Duration,
146    /// Period till we reset the exponential backoff to the default value.
147    pub exponential_backoff_reset: Duration,
148    /// Percentage jitter to add to the backoff.
149    pub exponential_backoff_jitter: u8,
150}
151
152impl Default for Config {
153    fn default() -> Self {
154        Self {
155            writable_dir: None,
156            channel_size: DEFAULT_CHANNEL_SIZE,
157            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
158            send_timeout: DEFAULT_REQUEST_TIMEOUT,
159            recv_timeout: DEFAULT_REQUEST_TIMEOUT,
160            slow_receive: DEFAULT_SLOW_RECEIVE_THRESHOLDS,
161            exponential_backoff_max: DEFAULT_BACKOFF_MAXIMUM_DELAY,
162            exponential_backoff_reset: DEFAULT_BACKOFF_RESET_INTERVAL,
163            exponential_backoff_jitter: RandomExponentialIter::DEFAULT_RANDOM_JITTER_RANGE,
164        }
165    }
166}
167
168/// Marker struct to identify a builder with no store configured
169#[derive(Debug, Clone, Copy)]
170pub struct NoStore;
171/// Marker struct to identify a builder with no connection configured
172#[derive(Debug, Clone, Copy)]
173pub struct NoConnect;
174
175/// Struct used to pass the connection configuration to the [`ConnectionConfig`]
176#[derive(Debug)]
177pub struct BuildConfig<S> {
178    /// Store used for the connection.
179    pub store: S,
180    /// Shared state of the connection.
181    pub state: Arc<SharedState>,
182}
183
184/// Structure used to store the configuration options for an instance of [`DeviceClient`] and
185/// [`DeviceConnection`].
186#[derive(Debug)]
187pub struct DeviceBuilder<C = NoConnect, S = NoStore> {
188    config: Config,
189    interfaces: Interfaces,
190    stored_retention: NonZero<usize>,
191    volatile_retention: NonZero<usize>,
192    store: S,
193    connection_config: C,
194}
195
196impl DeviceBuilder<NoConnect, NoStore> {
197    /// Create a new instance of the DeviceBuilder.
198    /// Has a default [`DeviceBuilder::channel_size`] that equals to [`crate::builder::DEFAULT_CHANNEL_SIZE`].
199    ///
200    /// ```no_run
201    /// use astarte_device_sdk::{builder::DeviceBuilder, transport::mqtt::MqttConfig};
202    ///
203    /// #[tokio::main]
204    /// async fn main(){
205    ///     let mut builder =
206    ///         DeviceBuilder::new()
207    ///             .interface_directory("path/to/interfaces")
208    ///             .unwrap();
209    /// }
210    /// ```
211    pub fn new() -> Self {
212        Self {
213            volatile_retention: DEFAULT_VOLATILE_CAPACITY,
214            stored_retention: DEFAULT_STORE_CAPACITY,
215            interfaces: Interfaces::new(),
216            connection_config: NoConnect,
217            store: NoStore,
218            config: Config::default(),
219        }
220    }
221}
222
223impl<S, C> DeviceBuilder<S, C> {
224    /// Add a single interface from the provided `.json` file.
225    ///
226    /// If an interface with the same name is present, the code will validate
227    /// the passed interface to ensure it has a newer version than the one stored.
228    pub fn interface_file<P>(self, path: P) -> Result<Self, AddInterfaceError>
229    where
230        P: AsRef<Path>,
231    {
232        let interface = fs::read_to_string(path.as_ref()).map_err(|err| AddInterfaceError::Io {
233            path: path.as_ref().to_path_buf(),
234            backtrace: err,
235        })?;
236
237        self.interface_str(&interface)
238            .map_err(|err| err.add_path_context(path.as_ref().to_owned()))
239    }
240
241    /// Add a single interface from the provided string.
242    ///
243    /// If an interface with the same name is present, the code will validate
244    /// the passed interface to ensure it has a newer version than the one stored.
245    pub fn interface_str(self, interface: &str) -> Result<Self, AddInterfaceError> {
246        let interface = Interface::from_str(interface)?;
247
248        self.interface(interface)
249    }
250
251    /// Add a single interface.
252    ///
253    /// If an interface with the same name is present, the code will validate
254    /// the passed interface to ensure it has a newer version than the one stored.
255    pub fn interface(mut self, interface: Interface) -> Result<Self, AddInterfaceError> {
256        debug!("adding interface {}", interface.interface_name());
257
258        let interface = self.interfaces.validate(interface)?;
259
260        let Some(interface) = interface else {
261            debug!("interface already present");
262
263            return Ok(self);
264        };
265
266        self.interfaces.add(interface);
267
268        Ok(self)
269    }
270
271    /// Add all the interfaces from the `.json` files contained in the specified folder.
272    pub fn interface_directory<P>(self, interfaces_directory: P) -> Result<Self, AddInterfaceError>
273    where
274        P: AsRef<Path>,
275    {
276        walk_dir_json(&interfaces_directory)
277            .map_err(|err| AddInterfaceError::Io {
278                path: interfaces_directory.as_ref().to_path_buf(),
279                backtrace: err,
280            })?
281            .iter()
282            .try_fold(self, |acc, path| acc.interface_file(path))
283    }
284
285    /// This method configures the bounded channel size.
286    pub fn channel_size(mut self, size: NonZero<usize>) -> Self {
287        self.config.channel_size = size;
288
289        self
290    }
291
292    /// Configure a writable directory for the device.
293    pub fn writable_dir<P>(mut self, path: P) -> Self
294    where
295        P: AsRef<Path>,
296    {
297        let path = path.as_ref().to_path_buf();
298
299        self.config.writable_dir = Some(path.to_owned());
300
301        self
302    }
303
304    /// Set the maximum number of elements that will be kept in memory
305    pub fn max_volatile_retention(mut self, items: NonZero<usize>) -> Self {
306        self.volatile_retention = items;
307
308        self
309    }
310
311    /// Set the timeout used while performing individual HTTP calls
312    /// and used while waiting for a connection to the MQTT server.
313    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
314        self.config.connection_timeout = timeout;
315
316        self
317    }
318
319    /// Set the timeout used while sending data on the connection transport
320    pub fn send_timeout(mut self, timeout: Duration) -> Self {
321        self.config.send_timeout = timeout;
322
323        self
324    }
325
326    /// Set the threshold for slow reception of the Astarte Device
327    ///
328    /// If the events are not dequeued in a timely manner this could impact the state of the Astarte
329    /// connection.
330    pub fn slow_receive_threshold(mut self, threshold: Duration) -> Self {
331        self.config.slow_receive = threshold;
332
333        self
334    }
335
336    /// Sets the maximum timeout generated by the exponential timeout generator.
337    /// Used by the connection to wait in between connection retries.
338    /// The timeout is an exponential timeout with a random jitter added, this duration limits the maximum
339    /// of the exponential part of the timeout, the random jitter will be added on top of this maximum.
340    ///
341    /// ```text
342    /// exponential + random(-jitter..+jitter)
343    /// ```
344    pub fn exponential_backoff_max(mut self, duration: Duration) -> Self {
345        self.config.exponential_backoff_max = duration;
346
347        self
348    }
349
350    /// Reset timeout interval, after this interval of time the exponential part of the timeout
351    /// will be reset to 0.
352    pub fn exponential_backoff_reset(mut self, interval: Duration) -> Self {
353        self.config.exponential_backoff_reset = interval;
354
355        self
356    }
357
358    /// Percentage of random jitter that will be added to the exponential retry timeout.
359    /// From 0 to 100. As an example a jitter percentage of 50 will result in possible timeout values
360    /// ranging from -50% to +50% of the exponential timeout.
361    /// Values grater than 100 will be clamped to 100.
362    pub fn exponential_backoff_jitter(mut self, percentage: u8) -> Self {
363        self.config.exponential_backoff_jitter = percentage;
364
365        self
366    }
367}
368
369impl<C> DeviceBuilder<C, NoStore> {
370    /// Set the backing storage for the device.
371    ///
372    /// This will store and retrieve the device's properties.
373    pub fn store<S>(self, store: S) -> DeviceBuilder<C, S>
374    where
375        S: PropertyStore,
376    {
377        DeviceBuilder {
378            volatile_retention: self.volatile_retention,
379            config: self.config,
380            stored_retention: self.stored_retention,
381            connection_config: self.connection_config,
382            interfaces: self.interfaces,
383            store,
384        }
385    }
386}
387
388impl<S> DeviceBuilder<NoConnect, S>
389where
390    S: StoredRetention,
391{
392    /// Set the maximum number of elements that will be kept in the store
393    pub fn max_stored_retention(mut self, items: NonZero<usize>) -> Self {
394        self.stored_retention = items;
395
396        self
397    }
398}
399
400impl<S> DeviceBuilder<NoConnect, S>
401where
402    S: PropertyStore,
403{
404    /// Configure the connection using the passed [`ConnectionConfig`].
405    ///
406    /// If the connection gets established correctly, the caller can than construct
407    /// the [`DeviceClient`] and [`DeviceConnection`] using the [`DeviceBuilder::build`] method.
408    pub fn connection<C>(self, connection_config: C) -> DeviceBuilder<C, S>
409    where
410        C: ConnectionConfig<S>,
411    {
412        DeviceBuilder {
413            interfaces: self.interfaces,
414            store: self.store,
415            volatile_retention: self.volatile_retention,
416            stored_retention: self.stored_retention,
417            config: self.config,
418            connection_config,
419        }
420    }
421}
422
423type BuildRes<C> = (DeviceClient<C>, DeviceConnection<C>);
424
425impl<C, S> DeviceBuilder<C, S>
426where
427    S: StoreCapabilities,
428    C: ConnectionConfig<S>,
429    Error: From<C::Err>,
430{
431    /// Method that consumes the builder and returns a working [`DeviceClient`] and
432    /// [`DeviceConnection`] with the specified settings.
433    pub async fn build(self) -> Result<BuildRes<C::Conn>, Error> {
434        if let Some(path) = &self.config.writable_dir {
435            tokio::fs::create_dir_all(path)
436                .await
437                .map_err(|backtrace| BuilderError::Io {
438                    path: path.clone(),
439                    backtrace,
440                })?;
441        }
442
443        let (events_tx, events_rx) = async_channel::bounded(self.config.channel_size.get());
444        let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
445
446        let volatile_store = VolatileStore::with_capacity(self.volatile_retention.get());
447
448        let backoff = RandomExponentialIter::with_jitter(
449            ExponentialIter::new(
450                self.config.exponential_backoff_max,
451                self.config.exponential_backoff_reset,
452            ),
453            self.config.exponential_backoff_jitter,
454        );
455
456        let state = Arc::new(SharedState::new(
457            self.config,
458            self.interfaces,
459            volatile_store,
460        ));
461
462        let config = BuildConfig {
463            store: self.store,
464            state: Arc::clone(&state),
465        };
466
467        let DeviceTransport {
468            connection,
469            sender,
470            store,
471        } = self.connection_config.connect(config).await?;
472
473        let (client_state, connection_state) = state.split();
474
475        let client = DeviceClient::new(
476            sender.clone(),
477            events_rx,
478            store.clone(),
479            client_state,
480            disconnect_tx,
481        );
482
483        let connection = DeviceConnection::new(
484            events_tx,
485            disconnect_rx,
486            store,
487            connection_state,
488            connection,
489            sender,
490            backoff,
491        );
492
493        connection.init_store(self.stored_retention).await?;
494
495        Ok((client, connection))
496    }
497}
498
499impl Default for DeviceBuilder {
500    fn default() -> Self {
501        Self::new()
502    }
503}
504
505/// Structure that stores a successfully established connection
506pub struct DeviceTransport<C>
507where
508    C: Connection,
509{
510    pub(crate) connection: C,
511    pub(crate) sender: C::Sender,
512    pub(crate) store: StoreWrapper<C::Store>,
513}
514
515/// Crate private connection implementation.
516pub trait ConnectionConfig<S> {
517    /// Type of the store used by the connection
518    type Store: StoreCapabilities;
519    /// Type of the constructed Connection
520    type Conn: Connection;
521    /// Type of the error got while opening the connection
522    type Err;
523
524    /// Connect method that consumes self to construct a working connection
525    /// This method is called internally by the builder.
526    fn connect(
527        self,
528        config: BuildConfig<S>,
529    ) -> impl Future<Output = Result<DeviceTransport<Self::Conn>, Self::Err>> + Send
530    where
531        S: PropertyStore;
532}
533
534/// Walks a directory returning an array of json files
535fn walk_dir_json<P>(path: P) -> Result<Vec<PathBuf>, io::Error>
536where
537    P: AsRef<Path>,
538{
539    std::fs::read_dir(path)?
540        .map(|res| {
541            res.and_then(|entry| {
542                let path = entry.path();
543                let metadata = entry.metadata()?;
544
545                Ok((path, metadata))
546            })
547        })
548        .filter_map(|res| match res {
549            Ok((path, metadata)) => {
550                if metadata.is_file() && path.extension() == Some(OsStr::new("json")) {
551                    Some(Ok(path))
552                } else {
553                    None
554                }
555            }
556            Err(e) => Some(Err(e)),
557        })
558        .collect()
559}
560
561#[cfg(test)]
562mod test {
563    use std::time::Duration;
564
565    use mockall::Sequence;
566    use tempfile::TempDir;
567
568    use crate::store::memory::MemoryStore;
569    use crate::test::DEVICE_PROPERTIES;
570    use crate::transport::mock::{MockCon, MockConfig, MockSender};
571
572    use super::*;
573
574    #[test]
575    fn interface_directory() {
576        let res =
577            DeviceBuilder::new().interface_directory("examples/individual_datastream/interfaces");
578
579        assert!(
580            res.is_ok(),
581            "Failed to load interfaces from directory: {res:?}"
582        );
583    }
584
585    #[test]
586    fn interface_existing_directory() {
587        let res = DeviceBuilder::new()
588            .interface_directory("examples/individual_datastream/interfaces")
589            .unwrap()
590            .interface_directory("examples/individual_datastream/interfaces");
591
592        assert!(
593            res.is_ok(),
594            "Failed to load interfaces from directory: {res:?}"
595        );
596    }
597
598    #[test]
599    fn should_get_writable_path() {
600        let dir = TempDir::new().unwrap();
601
602        let builder = DeviceBuilder::new().writable_dir(dir.path());
603
604        assert_eq!(builder.config.writable_dir, Some(dir.path().to_owned()))
605    }
606
607    #[tokio::test]
608    async fn should_build() {
609        let dir = TempDir::new().unwrap();
610
611        // TODO: add expectations
612        let mut config = MockConfig::<MemoryStore>::new();
613        let mut seq = Sequence::new();
614
615        let tmp_path = Some(dir.path().to_path_buf());
616
617        config
618            .expect_connect()
619            .once()
620            .in_sequence(&mut seq)
621            .withf(
622                move |BuildConfig {
623                          state,
624                          ..
625                      }| {
626                        state.config.writable_dir == tmp_path
627                        && state.interfaces.try_read().unwrap().get("org.astarte-platform.rust.examples.individual-properties.DeviceProperties").is_some()
628                },
629            )
630            .returning(|config| {
631                let mut sender = MockSender::new();
632                let mut seq = Sequence::new();
633                sender.expect_clone().once().in_sequence(&mut seq).returning(MockSender::new);
634
635                Ok(DeviceTransport {
636                    connection: MockCon::new(),
637                    sender,
638                    store: StoreWrapper::new(config.store),
639                })
640            });
641
642        let (_client, _connection) = tokio::time::timeout(
643            Duration::from_secs(3),
644            DeviceBuilder::new()
645                .writable_dir(dir.path())
646                .store(MemoryStore::default())
647                .interface_str(DEVICE_PROPERTIES)
648                .unwrap()
649                .connection(config)
650                .build(),
651        )
652        .await
653        .unwrap()
654        .unwrap();
655    }
656
657    #[test]
658    fn test_get_introspection_string() {
659        let options = DeviceBuilder::new()
660            .interface_directory("examples/individual_datastream/interfaces")
661            .expect("Failed to set interface directory");
662
663        let ifa = options.interfaces;
664
665        let expected = [
666            "org.astarte-platform.rust.examples.individual-datastream.DeviceDatastream:0:1",
667            "org.astarte-platform.rust.examples.individual-datastream.ServerDatastream:0:1",
668        ];
669
670        let intro = ifa.get_introspection_string();
671        let mut res: Vec<&str> = intro.split(';').collect();
672
673        res.sort_unstable();
674
675        assert_eq!(res, expected);
676    }
677}