detsys_ids_client/
worker.rs1use tokio::sync::mpsc::channel;
2use tokio::task::JoinHandle;
3use tracing::Instrument;
4
5use crate::collator::{Collator, SnapshotError};
6use crate::configuration_proxy::{ConfigurationProxy, ConfigurationProxyError};
7use crate::ds_correlation::Correlation;
8use crate::identity::AnonymousDistinctId;
9use crate::storage::Storage;
10use crate::submitter::Submitter;
11use crate::system_snapshot::SystemSnapshotter;
12use crate::transport::Transport;
13use crate::{DeviceId, DistinctId, Map, Recorder};
14
15pub struct Worker {
16 collator_task: JoinHandle<Result<(), SnapshotError>>,
17 submitter_task: JoinHandle<()>,
18 configuration_task: JoinHandle<Result<(), ConfigurationProxyError>>,
19}
20
21impl Worker {
22 #[cfg_attr(
23 feature = "tracing-instrument",
24 tracing::instrument(skip(
25 anonymous_distinct_id,
26 distinct_id,
27 device_id,
28 facts,
29 groups,
30 system_snapshotter,
31 storage,
32 transport
33 ))
34 )]
35 #[allow(clippy::too_many_arguments)]
36 pub(crate) async fn new<F: SystemSnapshotter, P: Storage, T: Transport + Sync + 'static>(
37 anonymous_distinct_id: Option<AnonymousDistinctId>,
38 distinct_id: Option<DistinctId>,
39 device_id: Option<DeviceId>,
40 facts: Option<Map>,
41 groups: Option<Map>,
42 system_snapshotter: F,
43 storage: P,
44 transport: T,
45 ) -> (Recorder, Worker) {
46 let (to_configuration_proxy, configuration_proxy_rx) = channel(1000);
52 let (to_collator, collator_rx) = channel(1000);
53 let (to_submitter, submitter_rx) = channel(1000);
54
55 let recorder = Recorder::new(to_collator, to_configuration_proxy);
56 let configuration = ConfigurationProxy::new(transport.clone(), configuration_proxy_rx);
57 let collator = Collator::new(
58 system_snapshotter,
59 storage,
60 collator_rx,
61 to_submitter,
62 anonymous_distinct_id,
63 distinct_id,
64 device_id,
65 facts.unwrap_or_default(),
66 groups.unwrap_or_default(),
67 Correlation::import(),
68 )
69 .await;
70 let submitter = Submitter::new(transport, submitter_rx);
71
72 let span = tracing::debug_span!("spawned worker");
73
74 let collator_task = tokio::spawn(collator.execute().instrument(span.clone()));
75 let configuration_task = tokio::spawn(configuration.execute().instrument(span.clone()));
76 let submitter_task = tokio::spawn(submitter.execute().instrument(span));
77
78 let worker = Self {
79 collator_task,
80 configuration_task,
81 submitter_task,
82 };
83
84 recorder
85 .trigger_configuration_refresh()
86 .instrument(tracing::debug_span!("Initial configuration sync"))
87 .await;
88
89 (recorder, worker)
90 }
91
92 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
93 pub async fn wait(self) {
94 if let Err(e) = self.configuration_task.await {
105 tracing::trace!(%e, "IDS Transport configuration task ended with an error");
106 }
107
108 if let Err(e) = self.collator_task.await {
109 tracing::trace!(%e, "IDS Transport event system_snapshotter ended with an error");
110 }
111
112 if let Err(e) = self.submitter_task.await {
113 tracing::trace!(%e, "IDS Transport event submitter ended with an error");
114 }
115 }
116}