detsys_ids_client/
worker.rs1use tokio::sync::mpsc::channel;
2use tokio::task::JoinHandle;
3use tracing::Instrument;
4
5use crate::collator::{Collator, SnapshotError};
6use crate::configuration_proxy::{ConfigurationProxy, ConfigurationProxyError};
7use crate::ds_correlation::Correlation;
8use crate::identity::AnonymousDistinctId;
9use crate::storage::Storage;
10use crate::submitter::Submitter;
11use crate::system_snapshot::SystemSnapshotter;
12use crate::transport::Transport;
13use crate::{DeviceId, DistinctId, Map, Recorder};
14
15pub struct Worker {
16 collator_task: JoinHandle<Result<(), SnapshotError>>,
17 submitter_task: JoinHandle<()>,
18 configuration_task: JoinHandle<Result<(), ConfigurationProxyError>>,
19}
20
21impl Worker {
22 #[cfg_attr(
23 feature = "tracing-instrument",
24 tracing::instrument(skip(
25 anonymous_distinct_id,
26 distinct_id,
27 device_id,
28 facts,
29 groups,
30 system_snapshotter,
31 storage,
32 transport
33 ))
34 )]
35 #[allow(clippy::too_many_arguments)]
36 pub(crate) async fn new<F: SystemSnapshotter, P: Storage, T: Transport + Sync + 'static>(
37 anonymous_distinct_id: Option<AnonymousDistinctId>,
38 distinct_id: Option<DistinctId>,
39 device_id: Option<DeviceId>,
40 facts: Option<Map>,
41 groups: Option<Map>,
42 system_snapshotter: F,
43 storage: P,
44 transport: T,
45 ) -> (Recorder, Worker) {
46 let (to_configuration_proxy, configuration_proxy_rx) = channel(1000);
52 let (to_collator, collator_rx) = channel(1000);
53 let (to_submitter, submitter_rx) = channel(1000);
54
55 let recorder = Recorder::new(to_collator.clone(), to_configuration_proxy);
56 let configuration =
57 ConfigurationProxy::new(transport.clone(), configuration_proxy_rx, to_collator);
58 let collator = Collator::new(
59 system_snapshotter,
60 storage,
61 collator_rx,
62 to_submitter,
63 anonymous_distinct_id,
64 distinct_id,
65 device_id,
66 facts.unwrap_or_default(),
67 groups.unwrap_or_default(),
68 Correlation::import(),
69 )
70 .await;
71 let submitter = Submitter::new(transport, submitter_rx);
72
73 let span = tracing::debug_span!("spawned worker");
74
75 let collator_task = tokio::spawn(collator.execute().instrument(span.clone()));
76 let configuration_task = tokio::spawn(configuration.execute().instrument(span.clone()));
77 let submitter_task = tokio::spawn(submitter.execute().instrument(span));
78
79 let worker = Self {
80 collator_task,
81 configuration_task,
82 submitter_task,
83 };
84
85 recorder
86 .trigger_configuration_refresh()
87 .instrument(tracing::debug_span!("Initial configuration sync"))
88 .await;
89
90 (recorder, worker)
91 }
92
93 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
94 pub async fn wait(self) {
95 if let Err(e) = self.configuration_task.await {
106 tracing::trace!(%e, "IDS Transport configuration task ended with an error");
107 }
108
109 if let Err(e) = self.collator_task.await {
110 tracing::trace!(%e, "IDS Transport event system_snapshotter ended with an error");
111 }
112
113 if let Err(e) = self.submitter_task.await {
114 tracing::trace!(%e, "IDS Transport event submitter ended with an error");
115 }
116 }
117}