detsys_ids_client/
worker.rs

1use tokio::sync::mpsc::channel;
2use tokio::task::JoinHandle;
3use tracing::Instrument;
4
5use crate::collator::{Collator, SnapshotError};
6use crate::configuration_proxy::{ConfigurationProxy, ConfigurationProxyError};
7use crate::ds_correlation::Correlation;
8use crate::identity::AnonymousDistinctId;
9use crate::storage::Storage;
10use crate::submitter::Submitter;
11use crate::system_snapshot::SystemSnapshotter;
12use crate::transport::Transport;
13use crate::{DeviceId, DistinctId, Groups, Map, Recorder};
14
15pub struct Worker {
16    collator_task: JoinHandle<Result<(), SnapshotError>>,
17    submitter_task: JoinHandle<()>,
18    configuration_task: JoinHandle<Result<(), ConfigurationProxyError>>,
19}
20
21impl Worker {
22    #[cfg_attr(
23        feature = "tracing-instrument",
24        tracing::instrument(skip(
25            anonymous_distinct_id,
26            distinct_id,
27            device_id,
28            facts,
29            groups,
30            system_snapshotter,
31            storage,
32            transport
33        ))
34    )]
35    #[allow(clippy::too_many_arguments)]
36    pub(crate) async fn new<F: SystemSnapshotter, P: Storage, T: Transport + Sync + 'static>(
37        anonymous_distinct_id: Option<AnonymousDistinctId>,
38        distinct_id: Option<DistinctId>,
39        device_id: Option<DeviceId>,
40        facts: Option<Map>,
41        groups: Option<Groups>,
42        system_snapshotter: F,
43        storage: P,
44        transport: T,
45    ) -> (Recorder, Worker) {
46        // Message flow:
47        //
48        // Recorder --> Configuration --\
49        //          `----> Collator -------> Submitter
50
51        let (to_configuration_proxy, configuration_proxy_rx) = channel(1000);
52        let (to_collator, collator_rx) = channel(1000);
53        let (to_submitter, submitter_rx) = channel(1000);
54
55        let recorder = Recorder::new(to_collator.clone(), to_configuration_proxy);
56        let mut configuration =
57            ConfigurationProxy::new(transport.clone(), configuration_proxy_rx, to_collator);
58        let collator = Collator::new(
59            system_snapshotter,
60            storage,
61            collator_rx,
62            to_submitter,
63            anonymous_distinct_id,
64            distinct_id,
65            device_id,
66            facts.unwrap_or_default(),
67            groups.unwrap_or_default(),
68            Correlation::import(),
69        )
70        .await;
71        let submitter = Submitter::new(transport, submitter_rx);
72
73        configuration
74            .bootstrap_checkin(collator.get_checkin().cloned())
75            .await;
76
77        let span = tracing::debug_span!("spawned worker");
78
79        let collator_task = tokio::spawn(collator.execute().instrument(span.clone()));
80        let configuration_task = tokio::spawn(configuration.execute().instrument(span.clone()));
81        let submitter_task = tokio::spawn(submitter.execute().instrument(span));
82
83        let worker = Self {
84            collator_task,
85            configuration_task,
86            submitter_task,
87        };
88
89        (recorder, worker)
90    }
91
92    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
93    pub async fn wait(self) {
94        // Note these three tasks have to shut down in this order.
95        //
96        // They are also all tokio::spawn'd, so they are all executing in the background, without needing to be awaited.
97        //
98        // The Submitter won't shut down if the Collator is still running.
99        // The ConfigurationProxy and Collator tasks won't shut down if any Recorders are still out there.
100        //
101        // I'm liking keeping these shut down in this explicit order so we
102        // don't accidentally create a more complicated situation where these
103        // tasks will (sometimes) never shut down.
104        if let Err(e) = self.configuration_task.await {
105            tracing::trace!(%e, "IDS Transport configuration task ended with an error");
106        }
107
108        if let Err(e) = self.collator_task.await {
109            tracing::trace!(%e, "IDS Transport event system_snapshotter ended with an error");
110        }
111
112        if let Err(e) = self.submitter_task.await {
113            tracing::trace!(%e, "IDS Transport event submitter ended with an error");
114        }
115    }
116}