output_tracker/threadsafe/
mod.rs1use crate::inner_subject::{BasicSubject, CelledSubject};
6use crate::inner_tracker::{BasicTracker, CelledTracker};
7use crate::tracker_handle::TrackerHandle;
8use std::sync::{Arc, Mutex, MutexGuard, TryLockError};
9
10#[derive(thiserror::Error, Debug)]
12pub enum Error {
13 #[error("failed to obtain a lock for the tracker")]
15 LockTrackerFailed,
16 #[error("failed to obtain a lock for the subject")]
18 LockSubjectFailed,
19}
20
21#[derive(Debug)]
35pub struct OutputTracker<M> {
36 handle: TrackerHandle,
37 inner: ThreadsafeTracker<M>,
38 subject: ThreadsafeSubject<M>,
39}
40
41impl<M> OutputTracker<M> {
42 const fn new(
43 handle: TrackerHandle,
44 inner: ThreadsafeTracker<M>,
45 subject: ThreadsafeSubject<M>,
46 ) -> Self {
47 Self {
48 handle,
49 inner,
50 subject,
51 }
52 }
53
54 pub fn stop(&self) -> Result<(), Error> {
59 self.subject.remove_tracker(self.handle)
60 }
61
62 pub fn clear(&self) -> Result<(), Error> {
67 self.inner.clear()
68 }
69
70 pub fn output(&self) -> Result<Vec<M>, Error>
79 where
80 M: Clone,
81 {
82 self.inner.output()
83 }
84}
85
86#[derive(Default, Debug, Clone)]
96pub struct OutputSubject<M> {
97 inner: ThreadsafeSubject<M>,
98}
99
100impl<M> OutputSubject<M> {
101 #[must_use]
106 pub fn new() -> Self {
107 Self {
108 inner: ThreadsafeSubject::new(),
109 }
110 }
111}
112
113impl<M> OutputSubject<M>
114where
115 M: Clone,
116{
117 pub fn create_tracker(&self) -> Result<OutputTracker<M>, Error> {
120 let new_tracker = ThreadsafeTracker::new();
121 let handle = self.inner.add_tracker(new_tracker.clone())?;
122 Ok(OutputTracker::new(handle, new_tracker, self.inner.clone()))
123 }
124
125 pub fn emit(&self, data: M) -> Result<(), Error> {
129 self.inner.emit(data)
130 }
131}
132
133#[derive(Default, Debug, Clone)]
134struct ThreadsafeSubject<M> {
135 cell: Arc<Mutex<BasicSubject<M, ThreadsafeTracker<M>>>>,
136}
137
138impl<M> ThreadsafeSubject<M> {
139 fn new() -> Self {
140 Self {
141 cell: Arc::new(Mutex::new(BasicSubject::new())),
142 }
143 }
144}
145
146impl<M> CelledSubject<M, ThreadsafeTracker<M>> for ThreadsafeSubject<M> {
147 type Inner<'a>
148 = MutexGuard<'a, BasicSubject<M, ThreadsafeTracker<M>>>
149 where
150 Self: 'a;
151 type InnerMut<'a>
152 = MutexGuard<'a, BasicSubject<M, ThreadsafeTracker<M>>>
153 where
154 Self: 'a;
155 type Error = Error;
156
157 fn subject(&self) -> Result<Self::Inner<'_>, Error> {
158 loop {
159 match self.cell.try_lock() {
160 Ok(subject) => return Ok(subject),
161 Err(TryLockError::WouldBlock) => {
162 },
164 Err(TryLockError::Poisoned(_)) => return Err(Error::LockSubjectFailed),
165 }
166 }
167 }
168
169 fn subject_mut(&self) -> Result<Self::InnerMut<'_>, Error> {
170 self.subject()
171 }
172}
173
174#[derive(Debug, Clone)]
175struct ThreadsafeTracker<M> {
176 cell: Arc<Mutex<BasicTracker<M>>>,
177}
178
179impl<M> CelledTracker<M> for ThreadsafeTracker<M> {
180 type Inner<'a>
181 = MutexGuard<'a, BasicTracker<M>>
182 where
183 M: 'a;
184 type InnerMut<'a>
185 = MutexGuard<'a, BasicTracker<M>>
186 where
187 M: 'a;
188 type Error = Error;
189
190 fn new() -> Self {
191 Self {
192 cell: Arc::new(Mutex::new(BasicTracker::new())),
193 }
194 }
195
196 fn tracker(&self) -> Result<Self::Inner<'_>, Self::Error> {
197 loop {
198 match self.cell.try_lock() {
199 Ok(tracker) => {
200 return Ok(tracker);
201 },
202 Err(TryLockError::WouldBlock) => {
203 },
205 Err(TryLockError::Poisoned(_)) => return Err(Error::LockTrackerFailed),
206 }
207 }
208 }
209
210 fn tracker_mut(&self) -> Result<Self::InnerMut<'_>, Self::Error> {
211 self.tracker()
212 }
213}
214
215#[cfg(test)]
216mod tests;