output_tracker/threadsafe/
mod.rs

1//! Threadsafe variant of [`OutputTracker`] and [`OutputSubject`].
2//!
3//! For an example on how to use it see the crate level documentation.
4
5use crate::inner_subject::{BasicSubject, CelledSubject};
6use crate::inner_tracker::{BasicTracker, CelledTracker};
7use crate::tracker_handle::TrackerHandle;
8use std::sync::{Arc, Mutex, MutexGuard, TryLockError};
9
10/// Error type for the threadsafe [`OutputTracker`] and [`OutputSubject`].
11#[derive(thiserror::Error, Debug)]
12pub enum Error {
13    /// Failed to obtain a lock for the tracker.
14    #[error("failed to obtain a lock for the tracker")]
15    LockTrackerFailed,
16    /// Failed to obtain a lock for the subject.
17    #[error("failed to obtain a lock for the subject")]
18    LockSubjectFailed,
19}
20
21/// A struct that collects state data or action data of any kind.
22///
23/// This is the threadsafe variant.
24///
25/// The tracked data can be read any time and as often as needed by calling the
26/// [`output()`][OutputTracker::output]. Each time the output is read, all data
27/// collected so far are returned. To track only new data emitted after the last
28/// read of the output, the [`clear()`][OutputTracker::clear] function should be
29/// called.
30///
31/// The tracker can be deactivated by calling the [`stop()`][OutputTracker::stop]
32/// function to stop it from collecting data. Once stopped the tracker can not
33/// be activated again.
34#[derive(Debug)]
35pub struct OutputTracker<M> {
36    handle: TrackerHandle,
37    inner: ThreadsafeTracker<M>,
38    subject: ThreadsafeSubject<M>,
39}
40
41impl<M> OutputTracker<M> {
42    const fn new(
43        handle: TrackerHandle,
44        inner: ThreadsafeTracker<M>,
45        subject: ThreadsafeSubject<M>,
46    ) -> Self {
47        Self {
48            handle,
49            inner,
50            subject,
51        }
52    }
53
54    /// Stops this tracker.
55    ///
56    /// After stopping a tracker it no longer tracks emitted data. Once a
57    /// tracker is stopped it can not be activated again.
58    pub fn stop(&self) -> Result<(), Error> {
59        self.subject.remove_tracker(self.handle)
60    }
61
62    /// Clears the data this tracker has been collected so far.
63    ///
64    /// After clearing a tracker it still tracks any data which is emitted after
65    /// this clear function has been called.
66    pub fn clear(&self) -> Result<(), Error> {
67        self.inner.clear()
68    }
69
70    /// Returns the data collected by this tracker so far.
71    ///
72    /// Each time this function is called it returns all data collected since
73    /// the tracker has been created or since the last call to of the
74    /// [`clear()`][OutputTracker::clear] function. To track only data that are
75    /// emitted after the last time the output was read, the
76    /// [`clear()`][OutputTracker::clear] should be called after the output has
77    /// been read.
78    pub fn output(&self) -> Result<Vec<M>, Error>
79    where
80        M: Clone,
81    {
82        self.inner.output()
83    }
84}
85
86/// Holds created [`OutputTracker`]s and emits data to all known trackers.
87///
88/// This is the threadsafe variant.
89///
90/// New [`OutputTracker`]s can be created by calling the
91/// [`create_tracker()`][OutputSubject::create_tracker] function.
92///
93/// The [`emit(data)`][OutputSubject::emit] function emits data to all trackers,
94/// that have been created for this subject and are not stopped yet.
95#[derive(Default, Debug, Clone)]
96pub struct OutputSubject<M> {
97    inner: ThreadsafeSubject<M>,
98}
99
100impl<M> OutputSubject<M> {
101    /// Constructs a new [`OutputSubject`].
102    ///
103    /// A new subject does nothing unless one or more trackers have been
104    /// created.
105    #[must_use]
106    pub fn new() -> Self {
107        Self {
108            inner: ThreadsafeSubject::new(),
109        }
110    }
111}
112
113impl<M> OutputSubject<M>
114where
115    M: Clone,
116{
117    /// Creates a new [`OutputTracker`] and registers it to be ready to track
118    /// emitted data.
119    pub fn create_tracker(&self) -> Result<OutputTracker<M>, Error> {
120        let new_tracker = ThreadsafeTracker::new();
121        let handle = self.inner.add_tracker(new_tracker.clone())?;
122        Ok(OutputTracker::new(handle, new_tracker, self.inner.clone()))
123    }
124
125    /// Emits given data to all active [`OutputTracker`]s.
126    ///
127    /// Stopped [`OutputTracker`]s do not receive any emitted data.
128    pub fn emit(&self, data: M) -> Result<(), Error> {
129        self.inner.emit(data)
130    }
131}
132
133#[derive(Default, Debug, Clone)]
134struct ThreadsafeSubject<M> {
135    cell: Arc<Mutex<BasicSubject<M, ThreadsafeTracker<M>>>>,
136}
137
138impl<M> ThreadsafeSubject<M> {
139    fn new() -> Self {
140        Self {
141            cell: Arc::new(Mutex::new(BasicSubject::new())),
142        }
143    }
144}
145
146impl<M> CelledSubject<M, ThreadsafeTracker<M>> for ThreadsafeSubject<M> {
147    type Inner<'a>
148        = MutexGuard<'a, BasicSubject<M, ThreadsafeTracker<M>>>
149    where
150        Self: 'a;
151    type InnerMut<'a>
152        = MutexGuard<'a, BasicSubject<M, ThreadsafeTracker<M>>>
153    where
154        Self: 'a;
155    type Error = Error;
156
157    fn subject(&self) -> Result<Self::Inner<'_>, Error> {
158        loop {
159            match self.cell.try_lock() {
160                Ok(subject) => return Ok(subject),
161                Err(TryLockError::WouldBlock) => {
162                    // try again
163                },
164                Err(TryLockError::Poisoned(_)) => return Err(Error::LockSubjectFailed),
165            }
166        }
167    }
168
169    fn subject_mut(&self) -> Result<Self::InnerMut<'_>, Error> {
170        self.subject()
171    }
172}
173
174#[derive(Debug, Clone)]
175struct ThreadsafeTracker<M> {
176    cell: Arc<Mutex<BasicTracker<M>>>,
177}
178
179impl<M> CelledTracker<M> for ThreadsafeTracker<M> {
180    type Inner<'a>
181        = MutexGuard<'a, BasicTracker<M>>
182    where
183        M: 'a;
184    type InnerMut<'a>
185        = MutexGuard<'a, BasicTracker<M>>
186    where
187        M: 'a;
188    type Error = Error;
189
190    fn new() -> Self {
191        Self {
192            cell: Arc::new(Mutex::new(BasicTracker::new())),
193        }
194    }
195
196    fn tracker(&self) -> Result<Self::Inner<'_>, Self::Error> {
197        loop {
198            match self.cell.try_lock() {
199                Ok(tracker) => {
200                    return Ok(tracker);
201                },
202                Err(TryLockError::WouldBlock) => {
203                    // try again
204                },
205                Err(TryLockError::Poisoned(_)) => return Err(Error::LockTrackerFailed),
206            }
207        }
208    }
209
210    fn tracker_mut(&self) -> Result<Self::InnerMut<'_>, Self::Error> {
211        self.tracker()
212    }
213}
214
215#[cfg(test)]
216mod tests;