1use crossbeam_channel::{
2 self, Receiver as CrossbeamReceiver, Sender as CrossbeamSender, TryRecvError,
3};
4
5use crate::{
6 processor::ProcessesTelemetryMessages, processor::ProcessingOutcome,
7 processor::ProcessingStrategy, processor::ProcessorMount, snapshot::Snapshot,
8 AggregatesProcessors, PutsSnapshot,
9};
10
11#[derive(Clone)]
12pub struct AttachedMount {
13 pub(crate) sender: CrossbeamSender<ScopedMountMessage>,
14}
15
16impl AttachedMount {
17 pub fn attach_mount<P: ProcessesTelemetryMessages>(
18 &self,
19 mount: ProcessorMount,
20 ) -> AttachedMount {
21 let (sender, receiver) = crossbeam_channel::unbounded();
22
23 let attached = InternalAttachedMount {
24 receiver: Some(receiver),
25 inner: mount,
26 };
27
28 self.put_processor(attached);
29
30 AttachedMount { sender }
31 }
32
33 pub fn put_processor<P: ProcessesTelemetryMessages>(&self, processor: P) {
34 let _ = self
35 .sender
36 .send(ScopedMountMessage::Processor(Box::new(processor)));
37 }
38
39 pub fn put_snapshooter<S: PutsSnapshot>(&self, snapshooter: S) {
40 let _ = self
41 .sender
42 .send(ScopedMountMessage::Snapshooter(Box::new(snapshooter)));
43 }
44}
45
46impl AggregatesProcessors for AttachedMount {
47 fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P) {
48 let _ = self
49 .sender
50 .send(ScopedMountMessage::Processor(Box::new(processor)));
51 }
52
53 fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
54 let _ = self
55 .sender
56 .send(ScopedMountMessage::Snapshooter(Box::new(snapshooter)));
57 }
58}
59
60pub(crate) enum ScopedMountMessage {
61 Processor(Box<dyn ProcessesTelemetryMessages>),
62 Snapshooter(Box<dyn PutsSnapshot>),
63}
64
65pub(crate) struct InternalAttachedMount {
66 pub receiver: Option<CrossbeamReceiver<ScopedMountMessage>>,
67 pub inner: ProcessorMount,
68}
69
70impl PutsSnapshot for InternalAttachedMount {
71 fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
72 self.inner.put_snapshot(into, descriptive);
73 }
74}
75
76impl ProcessesTelemetryMessages for InternalAttachedMount {
77 fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome {
78 if let Some(the_receiver) = self.receiver.take() {
79 let still_connected = loop {
80 match the_receiver.try_recv() {
81 Ok(ScopedMountMessage::Processor(p)) => {
82 self.inner.add_processor_dyn(p);
83 }
84 Ok(ScopedMountMessage::Snapshooter(s)) => {
85 self.inner.add_snapshooter_dyn(s);
86 }
87 Err(TryRecvError::Empty) => {
88 break true;
89 }
90 Err(TryRecvError::Disconnected) => break false,
91 }
92 };
93
94 if still_connected {
95 self.receiver = Some(the_receiver);
96 }
97 }
98
99 self.inner.process(max, strategy)
100 }
101}