detsys_ids_client/
worker.rs

1use tokio::sync::mpsc::channel;
2use tokio::task::JoinHandle;
3use tracing::Instrument;
4
5use crate::collator::{Collator, SnapshotError};
6use crate::configuration_proxy::{ConfigurationProxy, ConfigurationProxyError};
7use crate::ds_correlation::Correlation;
8use crate::identity::AnonymousDistinctId;
9use crate::storage::Storage;
10use crate::submitter::Submitter;
11use crate::system_snapshot::SystemSnapshotter;
12use crate::transport::Transport;
13use crate::{DeviceId, DistinctId, Map, Recorder};
14
15pub struct Worker {
16    collator_task: JoinHandle<Result<(), SnapshotError>>,
17    submitter_task: JoinHandle<()>,
18    configuration_task: JoinHandle<Result<(), ConfigurationProxyError>>,
19}
20
21impl Worker {
22    #[cfg_attr(
23        feature = "tracing-instrument",
24        tracing::instrument(skip(
25            anonymous_distinct_id,
26            distinct_id,
27            device_id,
28            facts,
29            groups,
30            system_snapshotter,
31            storage,
32            transport
33        ))
34    )]
35    #[allow(clippy::too_many_arguments)]
36    pub(crate) async fn new<F: SystemSnapshotter, P: Storage, T: Transport + Sync + 'static>(
37        anonymous_distinct_id: Option<AnonymousDistinctId>,
38        distinct_id: Option<DistinctId>,
39        device_id: Option<DeviceId>,
40        facts: Option<Map>,
41        groups: Option<Map>,
42        system_snapshotter: F,
43        storage: P,
44        transport: T,
45    ) -> (Recorder, Worker) {
46        // Message flow:
47        //
48        // Recorder --> Configuration --\
49        //          `----> Collator -------> Submitter
50
51        let (to_configuration_proxy, configuration_proxy_rx) = channel(1000);
52        let (to_collator, collator_rx) = channel(1000);
53        let (to_submitter, submitter_rx) = channel(1000);
54
55        let recorder = Recorder::new(to_collator.clone(), to_configuration_proxy);
56        let configuration =
57            ConfigurationProxy::new(transport.clone(), configuration_proxy_rx, to_collator);
58        let collator = Collator::new(
59            system_snapshotter,
60            storage,
61            collator_rx,
62            to_submitter,
63            anonymous_distinct_id,
64            distinct_id,
65            device_id,
66            facts.unwrap_or_default(),
67            groups.unwrap_or_default(),
68            Correlation::import(),
69        )
70        .await;
71        let submitter = Submitter::new(transport, submitter_rx);
72
73        let span = tracing::debug_span!("spawned worker");
74
75        let collator_task = tokio::spawn(collator.execute().instrument(span.clone()));
76        let configuration_task = tokio::spawn(configuration.execute().instrument(span.clone()));
77        let submitter_task = tokio::spawn(submitter.execute().instrument(span));
78
79        let worker = Self {
80            collator_task,
81            configuration_task,
82            submitter_task,
83        };
84
85        recorder
86            .trigger_configuration_refresh()
87            .instrument(tracing::debug_span!("Initial configuration sync"))
88            .await;
89
90        (recorder, worker)
91    }
92
93    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
94    pub async fn wait(self) {
95        // Note these three tasks have to shut down in this order.
96        //
97        // They are also all tokio::spawn'd, so they are all executing in the background, without needing to be awaited.
98        //
99        // The Submitter won't shut down if the Collator is still running.
100        // The ConfigurationProxy and Collator tasks won't shut down if any Recorders are still out there.
101        //
102        // I'm liking keeping these shut down in this explicit order so we
103        // don't accidentally create a more complicated situation where these
104        // tasks will (sometimes) never shut down.
105        if let Err(e) = self.configuration_task.await {
106            tracing::trace!(%e, "IDS Transport configuration task ended with an error");
107        }
108
109        if let Err(e) = self.collator_task.await {
110            tracing::trace!(%e, "IDS Transport event system_snapshotter ended with an error");
111        }
112
113        if let Err(e) = self.submitter_task.await {
114            tracing::trace!(%e, "IDS Transport event submitter ended with an error");
115        }
116    }
117}