detsys_ids_client/
worker.rs

1use tokio::sync::mpsc::channel;
2use tokio::task::JoinHandle;
3use tracing::Instrument;
4
5use crate::collator::{Collator, SnapshotError};
6use crate::configuration_proxy::{ConfigurationProxy, ConfigurationProxyError};
7use crate::ds_correlation::Correlation;
8use crate::identity::AnonymousDistinctId;
9use crate::storage::Storage;
10use crate::submitter::Submitter;
11use crate::system_snapshot::SystemSnapshotter;
12use crate::transport::Transport;
13use crate::{DeviceId, DistinctId, Map, Recorder};
14
15pub struct Worker {
16    collator_task: JoinHandle<Result<(), SnapshotError>>,
17    submitter_task: JoinHandle<()>,
18    configuration_task: JoinHandle<Result<(), ConfigurationProxyError>>,
19}
20
21impl Worker {
22    #[cfg_attr(
23        feature = "tracing-instrument",
24        tracing::instrument(skip(
25            anonymous_distinct_id,
26            distinct_id,
27            device_id,
28            facts,
29            groups,
30            system_snapshotter,
31            storage,
32            transport
33        ))
34    )]
35    #[allow(clippy::too_many_arguments)]
36    pub(crate) async fn new<F: SystemSnapshotter, P: Storage, T: Transport + Sync + 'static>(
37        anonymous_distinct_id: Option<AnonymousDistinctId>,
38        distinct_id: Option<DistinctId>,
39        device_id: Option<DeviceId>,
40        facts: Option<Map>,
41        groups: Option<Map>,
42        system_snapshotter: F,
43        storage: P,
44        transport: T,
45    ) -> (Recorder, Worker) {
46        // Message flow:
47        //
48        // Recorder --> Configuration --\
49        //          `----> Collator -------> Submitter
50
51        let (to_configuration_proxy, configuration_proxy_rx) = channel(1000);
52        let (to_collator, collator_rx) = channel(1000);
53        let (to_submitter, submitter_rx) = channel(1000);
54
55        let recorder = Recorder::new(to_collator, to_configuration_proxy);
56        let configuration = ConfigurationProxy::new(transport.clone(), configuration_proxy_rx);
57        let collator = Collator::new(
58            system_snapshotter,
59            storage,
60            collator_rx,
61            to_submitter,
62            anonymous_distinct_id,
63            distinct_id,
64            device_id,
65            facts.unwrap_or_default(),
66            groups.unwrap_or_default(),
67            Correlation::import(),
68        )
69        .await;
70        let submitter = Submitter::new(transport, submitter_rx);
71
72        let span = tracing::debug_span!("spawned worker");
73
74        let collator_task = tokio::spawn(collator.execute().instrument(span.clone()));
75        let configuration_task = tokio::spawn(configuration.execute().instrument(span.clone()));
76        let submitter_task = tokio::spawn(submitter.execute().instrument(span));
77
78        let worker = Self {
79            collator_task,
80            configuration_task,
81            submitter_task,
82        };
83
84        recorder
85            .trigger_configuration_refresh()
86            .instrument(tracing::debug_span!("Initial configuration sync"))
87            .await;
88
89        (recorder, worker)
90    }
91
92    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
93    pub async fn wait(self) {
94        // Note these three tasks have to shut down in this order.
95        //
96        // They are also all tokio::spawn'd, so they are all executing in the background, without needing to be awaited.
97        //
98        // The Submitter won't shut down if the Collator is still running.
99        // The ConfigurationProxy and Collator tasks won't shut down if any Recorders are still out there.
100        //
101        // I'm liking keeping these shut down in this explicit order so we
102        // don't accidentally create a more complicated situation where these
103        // tasks will (sometimes) never shut down.
104        if let Err(e) = self.configuration_task.await {
105            tracing::trace!(%e, "IDS Transport configuration task ended with an error");
106        }
107
108        if let Err(e) = self.collator_task.await {
109            tracing::trace!(%e, "IDS Transport event system_snapshotter ended with an error");
110        }
111
112        if let Err(e) = self.submitter_task.await {
113            tracing::trace!(%e, "IDS Transport event submitter ended with an error");
114        }
115    }
116}