1use std::ffi::OsStr;
23use std::fmt::Debug;
24use std::fs;
25use std::future::Future;
26use std::io;
27use std::num::NonZero;
28use std::path::Path;
29use std::path::PathBuf;
30use std::str::FromStr;
31use std::sync::Arc;
32use std::time::Duration;
33
34use astarte_interfaces::Interface;
35use tracing::debug;
36
37use crate::Error;
38use crate::client::DeviceClient;
39use crate::connection::DeviceConnection;
40use crate::interfaces::Interfaces;
41use crate::introspection::AddInterfaceError;
42use crate::retention::RetentionError;
43use crate::retention::StoredRetention;
44use crate::retention::memory::VolatileStore;
45use crate::retry::ExponentialIter;
46use crate::retry::RandomExponentialIter;
47use crate::state::SharedState;
48use crate::store::PropertyStore;
49use crate::store::StoreCapabilities;
50use crate::store::sqlite::SqliteError;
51use crate::store::wrapper::StoreWrapper;
52use crate::transport::Connection;
53
54pub const DEFAULT_CHANNEL_SIZE: NonZero<usize> = NonZero::new(50).unwrap();
60
61pub const DEFAULT_VOLATILE_CAPACITY: NonZero<usize> = NonZero::new(1000).unwrap();
63
64pub const DEFAULT_STORE_CAPACITY: NonZero<usize> = NonZero::new(1_000_000).unwrap();
66
67pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
71
72pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
77
78pub const DEFAULT_SLOW_RECEIVE_THRESHOLDS: Duration = Duration::from_secs(10);
80
81pub const DEFAULT_BACKOFF_MAXIMUM_DELAY: Duration = Duration::from_secs(256);
83
84pub const DEFAULT_BACKOFF_RESET_INTERVAL: Duration = Duration::from_secs(256 * 4);
86
87pub const DEFAULT_BACKOFF_JITTER_PERCENTAGE: u8 = 50;
89
90#[non_exhaustive]
94#[derive(thiserror::Error, Debug)]
95pub enum BuilderError {
96 #[error("couldn't read interface path {}", .path.display())]
98 Io {
99 path: PathBuf,
101 #[source]
103 backtrace: io::Error,
104 },
105 #[error("couldn't get metadata for {}", .path.display())]
107 DirectoryMetadata {
108 path: PathBuf,
110 #[source]
112 backtrace: io::Error,
113 },
114 #[error("invalid directory at {}", .0.display())]
116 NotADirectory(PathBuf),
117 #[error("directory is read only {}", .0.display())]
119 DirectoryReadonly(PathBuf),
120 #[error("couldn't connect to the SQLite store")]
122 Sqlite(#[from] SqliteError),
123 #[error("couldn't set the maximum number of items in the store")]
125 Retention(#[from] RetentionError),
126}
127
128#[derive(Debug, Clone)]
130#[non_exhaustive]
131pub struct Config {
132 pub writable_dir: Option<PathBuf>,
134 pub channel_size: NonZero<usize>,
136 pub connection_timeout: Duration,
138 pub send_timeout: Duration,
140 pub recv_timeout: Duration,
142 pub slow_receive: Duration,
144 pub exponential_backoff_max: Duration,
146 pub exponential_backoff_reset: Duration,
148 pub exponential_backoff_jitter: u8,
150}
151
152impl Default for Config {
153 fn default() -> Self {
154 Self {
155 writable_dir: None,
156 channel_size: DEFAULT_CHANNEL_SIZE,
157 connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
158 send_timeout: DEFAULT_REQUEST_TIMEOUT,
159 recv_timeout: DEFAULT_REQUEST_TIMEOUT,
160 slow_receive: DEFAULT_SLOW_RECEIVE_THRESHOLDS,
161 exponential_backoff_max: DEFAULT_BACKOFF_MAXIMUM_DELAY,
162 exponential_backoff_reset: DEFAULT_BACKOFF_RESET_INTERVAL,
163 exponential_backoff_jitter: RandomExponentialIter::DEFAULT_RANDOM_JITTER_RANGE,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Copy)]
170pub struct NoStore;
171#[derive(Debug, Clone, Copy)]
173pub struct NoConnect;
174
175#[derive(Debug)]
177pub struct BuildConfig<S> {
178 pub store: S,
180 pub state: Arc<SharedState>,
182}
183
184#[derive(Debug)]
187pub struct DeviceBuilder<C = NoConnect, S = NoStore> {
188 config: Config,
189 interfaces: Interfaces,
190 stored_retention: NonZero<usize>,
191 volatile_retention: NonZero<usize>,
192 store: S,
193 connection_config: C,
194}
195
196impl DeviceBuilder<NoConnect, NoStore> {
197 pub fn new() -> Self {
212 Self {
213 volatile_retention: DEFAULT_VOLATILE_CAPACITY,
214 stored_retention: DEFAULT_STORE_CAPACITY,
215 interfaces: Interfaces::new(),
216 connection_config: NoConnect,
217 store: NoStore,
218 config: Config::default(),
219 }
220 }
221}
222
223impl<S, C> DeviceBuilder<S, C> {
224 pub fn interface_file<P>(self, path: P) -> Result<Self, AddInterfaceError>
229 where
230 P: AsRef<Path>,
231 {
232 let interface = fs::read_to_string(path.as_ref()).map_err(|err| AddInterfaceError::Io {
233 path: path.as_ref().to_path_buf(),
234 backtrace: err,
235 })?;
236
237 self.interface_str(&interface)
238 .map_err(|err| err.add_path_context(path.as_ref().to_owned()))
239 }
240
241 pub fn interface_str(self, interface: &str) -> Result<Self, AddInterfaceError> {
246 let interface = Interface::from_str(interface)?;
247
248 self.interface(interface)
249 }
250
251 pub fn interface(mut self, interface: Interface) -> Result<Self, AddInterfaceError> {
256 debug!("adding interface {}", interface.interface_name());
257
258 let interface = self.interfaces.validate(interface)?;
259
260 let Some(interface) = interface else {
261 debug!("interface already present");
262
263 return Ok(self);
264 };
265
266 self.interfaces.add(interface);
267
268 Ok(self)
269 }
270
271 pub fn interface_directory<P>(self, interfaces_directory: P) -> Result<Self, AddInterfaceError>
273 where
274 P: AsRef<Path>,
275 {
276 walk_dir_json(&interfaces_directory)
277 .map_err(|err| AddInterfaceError::Io {
278 path: interfaces_directory.as_ref().to_path_buf(),
279 backtrace: err,
280 })?
281 .iter()
282 .try_fold(self, |acc, path| acc.interface_file(path))
283 }
284
285 pub fn channel_size(mut self, size: NonZero<usize>) -> Self {
287 self.config.channel_size = size;
288
289 self
290 }
291
292 pub fn writable_dir<P>(mut self, path: P) -> Self
294 where
295 P: AsRef<Path>,
296 {
297 let path = path.as_ref().to_path_buf();
298
299 self.config.writable_dir = Some(path.to_owned());
300
301 self
302 }
303
304 pub fn max_volatile_retention(mut self, items: NonZero<usize>) -> Self {
306 self.volatile_retention = items;
307
308 self
309 }
310
311 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
314 self.config.connection_timeout = timeout;
315
316 self
317 }
318
319 pub fn send_timeout(mut self, timeout: Duration) -> Self {
321 self.config.send_timeout = timeout;
322
323 self
324 }
325
326 pub fn slow_receive_threshold(mut self, threshold: Duration) -> Self {
331 self.config.slow_receive = threshold;
332
333 self
334 }
335
336 pub fn exponential_backoff_max(mut self, duration: Duration) -> Self {
345 self.config.exponential_backoff_max = duration;
346
347 self
348 }
349
350 pub fn exponential_backoff_reset(mut self, interval: Duration) -> Self {
353 self.config.exponential_backoff_reset = interval;
354
355 self
356 }
357
358 pub fn exponential_backoff_jitter(mut self, percentage: u8) -> Self {
363 self.config.exponential_backoff_jitter = percentage;
364
365 self
366 }
367}
368
369impl<C> DeviceBuilder<C, NoStore> {
370 pub fn store<S>(self, store: S) -> DeviceBuilder<C, S>
374 where
375 S: PropertyStore,
376 {
377 DeviceBuilder {
378 volatile_retention: self.volatile_retention,
379 config: self.config,
380 stored_retention: self.stored_retention,
381 connection_config: self.connection_config,
382 interfaces: self.interfaces,
383 store,
384 }
385 }
386}
387
388impl<S> DeviceBuilder<NoConnect, S>
389where
390 S: StoredRetention,
391{
392 pub fn max_stored_retention(mut self, items: NonZero<usize>) -> Self {
394 self.stored_retention = items;
395
396 self
397 }
398}
399
400impl<S> DeviceBuilder<NoConnect, S>
401where
402 S: PropertyStore,
403{
404 pub fn connection<C>(self, connection_config: C) -> DeviceBuilder<C, S>
409 where
410 C: ConnectionConfig<S>,
411 {
412 DeviceBuilder {
413 interfaces: self.interfaces,
414 store: self.store,
415 volatile_retention: self.volatile_retention,
416 stored_retention: self.stored_retention,
417 config: self.config,
418 connection_config,
419 }
420 }
421}
422
423type BuildRes<C> = (DeviceClient<C>, DeviceConnection<C>);
424
425impl<C, S> DeviceBuilder<C, S>
426where
427 S: StoreCapabilities,
428 C: ConnectionConfig<S>,
429 Error: From<C::Err>,
430{
431 pub async fn build(self) -> Result<BuildRes<C::Conn>, Error> {
434 if let Some(path) = &self.config.writable_dir {
435 tokio::fs::create_dir_all(path)
436 .await
437 .map_err(|backtrace| BuilderError::Io {
438 path: path.clone(),
439 backtrace,
440 })?;
441 }
442
443 let (events_tx, events_rx) = async_channel::bounded(self.config.channel_size.get());
444 let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
445
446 let volatile_store = VolatileStore::with_capacity(self.volatile_retention.get());
447
448 let backoff = RandomExponentialIter::with_jitter(
449 ExponentialIter::new(
450 self.config.exponential_backoff_max,
451 self.config.exponential_backoff_reset,
452 ),
453 self.config.exponential_backoff_jitter,
454 );
455
456 let state = Arc::new(SharedState::new(
457 self.config,
458 self.interfaces,
459 volatile_store,
460 ));
461
462 let config = BuildConfig {
463 store: self.store,
464 state: Arc::clone(&state),
465 };
466
467 let DeviceTransport {
468 connection,
469 sender,
470 store,
471 } = self.connection_config.connect(config).await?;
472
473 let (client_state, connection_state) = state.split();
474
475 let client = DeviceClient::new(
476 sender.clone(),
477 events_rx,
478 store.clone(),
479 client_state,
480 disconnect_tx,
481 );
482
483 let connection = DeviceConnection::new(
484 events_tx,
485 disconnect_rx,
486 store,
487 connection_state,
488 connection,
489 sender,
490 backoff,
491 );
492
493 connection.init_store(self.stored_retention).await?;
494
495 Ok((client, connection))
496 }
497}
498
499impl Default for DeviceBuilder {
500 fn default() -> Self {
501 Self::new()
502 }
503}
504
505pub struct DeviceTransport<C>
507where
508 C: Connection,
509{
510 pub(crate) connection: C,
511 pub(crate) sender: C::Sender,
512 pub(crate) store: StoreWrapper<C::Store>,
513}
514
515pub trait ConnectionConfig<S> {
517 type Store: StoreCapabilities;
519 type Conn: Connection;
521 type Err;
523
524 fn connect(
527 self,
528 config: BuildConfig<S>,
529 ) -> impl Future<Output = Result<DeviceTransport<Self::Conn>, Self::Err>> + Send
530 where
531 S: PropertyStore;
532}
533
534fn walk_dir_json<P>(path: P) -> Result<Vec<PathBuf>, io::Error>
536where
537 P: AsRef<Path>,
538{
539 std::fs::read_dir(path)?
540 .map(|res| {
541 res.and_then(|entry| {
542 let path = entry.path();
543 let metadata = entry.metadata()?;
544
545 Ok((path, metadata))
546 })
547 })
548 .filter_map(|res| match res {
549 Ok((path, metadata)) => {
550 if metadata.is_file() && path.extension() == Some(OsStr::new("json")) {
551 Some(Ok(path))
552 } else {
553 None
554 }
555 }
556 Err(e) => Some(Err(e)),
557 })
558 .collect()
559}
560
561#[cfg(test)]
562mod test {
563 use std::time::Duration;
564
565 use mockall::Sequence;
566 use tempfile::TempDir;
567
568 use crate::store::memory::MemoryStore;
569 use crate::test::DEVICE_PROPERTIES;
570 use crate::transport::mock::{MockCon, MockConfig, MockSender};
571
572 use super::*;
573
574 #[test]
575 fn interface_directory() {
576 let res =
577 DeviceBuilder::new().interface_directory("examples/individual_datastream/interfaces");
578
579 assert!(
580 res.is_ok(),
581 "Failed to load interfaces from directory: {res:?}"
582 );
583 }
584
585 #[test]
586 fn interface_existing_directory() {
587 let res = DeviceBuilder::new()
588 .interface_directory("examples/individual_datastream/interfaces")
589 .unwrap()
590 .interface_directory("examples/individual_datastream/interfaces");
591
592 assert!(
593 res.is_ok(),
594 "Failed to load interfaces from directory: {res:?}"
595 );
596 }
597
598 #[test]
599 fn should_get_writable_path() {
600 let dir = TempDir::new().unwrap();
601
602 let builder = DeviceBuilder::new().writable_dir(dir.path());
603
604 assert_eq!(builder.config.writable_dir, Some(dir.path().to_owned()))
605 }
606
607 #[tokio::test]
608 async fn should_build() {
609 let dir = TempDir::new().unwrap();
610
611 let mut config = MockConfig::<MemoryStore>::new();
613 let mut seq = Sequence::new();
614
615 let tmp_path = Some(dir.path().to_path_buf());
616
617 config
618 .expect_connect()
619 .once()
620 .in_sequence(&mut seq)
621 .withf(
622 move |BuildConfig {
623 state,
624 ..
625 }| {
626 state.config.writable_dir == tmp_path
627 && state.interfaces.try_read().unwrap().get("org.astarte-platform.rust.examples.individual-properties.DeviceProperties").is_some()
628 },
629 )
630 .returning(|config| {
631 let mut sender = MockSender::new();
632 let mut seq = Sequence::new();
633 sender.expect_clone().once().in_sequence(&mut seq).returning(MockSender::new);
634
635 Ok(DeviceTransport {
636 connection: MockCon::new(),
637 sender,
638 store: StoreWrapper::new(config.store),
639 })
640 });
641
642 let (_client, _connection) = tokio::time::timeout(
643 Duration::from_secs(3),
644 DeviceBuilder::new()
645 .writable_dir(dir.path())
646 .store(MemoryStore::default())
647 .interface_str(DEVICE_PROPERTIES)
648 .unwrap()
649 .connection(config)
650 .build(),
651 )
652 .await
653 .unwrap()
654 .unwrap();
655 }
656
657 #[test]
658 fn test_get_introspection_string() {
659 let options = DeviceBuilder::new()
660 .interface_directory("examples/individual_datastream/interfaces")
661 .expect("Failed to set interface directory");
662
663 let ifa = options.interfaces;
664
665 let expected = [
666 "org.astarte-platform.rust.examples.individual-datastream.DeviceDatastream:0:1",
667 "org.astarte-platform.rust.examples.individual-datastream.ServerDatastream:0:1",
668 ];
669
670 let intro = ifa.get_introspection_string();
671 let mut res: Vec<&str> = intro.split(';').collect();
672
673 res.sort_unstable();
674
675 assert_eq!(res, expected);
676 }
677}