metrix/
attached_mount.rs

1use crossbeam_channel::{
2    self, Receiver as CrossbeamReceiver, Sender as CrossbeamSender, TryRecvError,
3};
4
5use crate::{
6    processor::ProcessesTelemetryMessages, processor::ProcessingOutcome,
7    processor::ProcessingStrategy, processor::ProcessorMount, snapshot::Snapshot,
8    AggregatesProcessors, PutsSnapshot,
9};
10
11#[derive(Clone)]
12pub struct AttachedMount {
13    pub(crate) sender: CrossbeamSender<ScopedMountMessage>,
14}
15
16impl AttachedMount {
17    pub fn attach_mount<P: ProcessesTelemetryMessages>(
18        &self,
19        mount: ProcessorMount,
20    ) -> AttachedMount {
21        let (sender, receiver) = crossbeam_channel::unbounded();
22
23        let attached = InternalAttachedMount {
24            receiver: Some(receiver),
25            inner: mount,
26        };
27
28        self.put_processor(attached);
29
30        AttachedMount { sender }
31    }
32
33    pub fn put_processor<P: ProcessesTelemetryMessages>(&self, processor: P) {
34        let _ = self
35            .sender
36            .send(ScopedMountMessage::Processor(Box::new(processor)));
37    }
38
39    pub fn put_snapshooter<S: PutsSnapshot>(&self, snapshooter: S) {
40        let _ = self
41            .sender
42            .send(ScopedMountMessage::Snapshooter(Box::new(snapshooter)));
43    }
44}
45
46impl AggregatesProcessors for AttachedMount {
47    fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P) {
48        let _ = self
49            .sender
50            .send(ScopedMountMessage::Processor(Box::new(processor)));
51    }
52
53    fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
54        let _ = self
55            .sender
56            .send(ScopedMountMessage::Snapshooter(Box::new(snapshooter)));
57    }
58}
59
60pub(crate) enum ScopedMountMessage {
61    Processor(Box<dyn ProcessesTelemetryMessages>),
62    Snapshooter(Box<dyn PutsSnapshot>),
63}
64
65pub(crate) struct InternalAttachedMount {
66    pub receiver: Option<CrossbeamReceiver<ScopedMountMessage>>,
67    pub inner: ProcessorMount,
68}
69
70impl PutsSnapshot for InternalAttachedMount {
71    fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
72        self.inner.put_snapshot(into, descriptive);
73    }
74}
75
76impl ProcessesTelemetryMessages for InternalAttachedMount {
77    fn process(&mut self, max: usize, strategy: ProcessingStrategy) -> ProcessingOutcome {
78        if let Some(the_receiver) = self.receiver.take() {
79            let still_connected = loop {
80                match the_receiver.try_recv() {
81                    Ok(ScopedMountMessage::Processor(p)) => {
82                        self.inner.add_processor_dyn(p);
83                    }
84                    Ok(ScopedMountMessage::Snapshooter(s)) => {
85                        self.inner.add_snapshooter_dyn(s);
86                    }
87                    Err(TryRecvError::Empty) => {
88                        break true;
89                    }
90                    Err(TryRecvError::Disconnected) => break false,
91                }
92            };
93
94            if still_connected {
95                self.receiver = Some(the_receiver);
96            }
97        }
98
99        self.inner.process(max, strategy)
100    }
101}