parallel_task/accessors/
read_accessor.rs1use std::{ops::{Deref, DerefMut}, sync::{atomic::AtomicPtr, Arc}};
10
11use crate::{accessors::limit_queue::LimitAccessQueue};
12
13
14macro_rules! readaccessorref {
18 ($($RdAc:ident),*) => {
19 $(
20 pub struct $RdAc<T,State>(ReadAccessor<T,State>);
21
22 impl<T,State> $RdAc<T,State> {
23 pub fn new(obj:ReadAccessor<T,State>) -> Self {
24 Self(obj)
25 }
26 }
27
28 impl<T,State> Deref for $RdAc<T,State> {
29 type Target = ReadAccessor<T,State>;
30
31 fn deref(&self) -> &Self::Target {
32 &self.0
33 }
34 }
35
36 impl<T,State> DerefMut for $RdAc<T,State> {
37
38 fn deref_mut(&mut self) -> &mut Self::Target {
39 &mut self.0
40 }
41 }
42 )*
43 };
44}
45
46readaccessorref!(PrimaryAccessor, SecondaryAccessor);
47
48
49struct QueuePtr<T,State> {
54 ptr:Arc<AtomicPtr<LimitAccessQueue<T,State>>>,
55 #[allow(dead_code)]
56 owner:Arc<LimitAccessQueue<T,State>>
57}
58
59impl<T,State> QueuePtr<T,State> {
60 fn new(ptr:Arc<AtomicPtr<LimitAccessQueue<T,State>>>, owner:Arc<LimitAccessQueue<T,State>>) -> Self {
61 Self {
62 ptr,
63 owner
64 }
65 }
66}
67
68impl<T,State> Deref for QueuePtr<T,State> {
69 type Target = Arc<AtomicPtr<LimitAccessQueue<T,State>>>;
70
71 fn deref(&self) -> &Self::Target {
72 &self.ptr
73 }
74}
75
76impl<T,State> DerefMut for QueuePtr<T,State> {
77
78 fn deref_mut(&mut self) -> &mut Self::Target {
79 &mut self.ptr
80 }
81}
82
83impl<T,State> Drop for QueuePtr<T,State> {
84 fn drop(&mut self) {
85
86 self.ptr.store(std::ptr::null_mut(), std::sync::atomic::Ordering::Release);
87 }
88}
89
90#[derive(PartialEq)]
91pub enum ReadAccessorType {
92 Primary,
93 Secondary
94}
95
96pub struct ReadAccessor<T,State>
97{
98 val: QueuePtr<T,State>,
99 rtype: ReadAccessorType,
100}
101
102#[allow(dead_code)]
103impl<T,State> ReadAccessor<T,State>
104where State: Clone + Default
105{
106 pub fn new(owner:Arc<LimitAccessQueue<T,State>>, rtype:ReadAccessorType) -> Self {
107 let arc_ptr = Arc::as_ptr(&owner) as *mut LimitAccessQueue<T,State>;
108 let obj = Arc::new(AtomicPtr::new(arc_ptr));
109 let val = QueuePtr::new(obj, owner);
110 Self {
111 val,
112 rtype
113 }
114 }
115
116 fn as_ptr(&self) -> Option<*mut LimitAccessQueue<T,State>> {
118 let ptr = self.val.load(std::sync::atomic::Ordering::Acquire);
119 if ptr.is_null() {
120 None
121 } else {
122 Some(ptr)
123 }
124 }
125
126 fn get_ref(&self) -> Option<&LimitAccessQueue<T,State>> {
127 unsafe {
128 if let Some(ptr_ref) = self.as_ptr() {
129 let opt_ptr = (ptr_ref).as_ref();
130 if let Some(ptr) = opt_ptr {
131 return Some(ptr);
132 }
133 }
134 None
135 }
136 }
137
138 fn get_mut(&self) -> Option<&mut LimitAccessQueue<T,State>> {
139 unsafe {
140 if let Some(ptr_ref) = self.as_ptr() {
141 (ptr_ref).as_mut()
142 } else {
143 None
144 }
145 }
146 }
147
148 fn within_mutable_block<F,Output>(&self,f:F) -> Option<Output>
149 where F: FnOnce(&mut LimitAccessQueue<T,State>) -> Option<Output> {
150
151 if let Some(ptr) = self.get_mut() {
152 f(ptr)
153 } else {
154 None
155 }
156 }
157
158 pub fn is_primary(&self) -> bool {
159 self.rtype == ReadAccessorType::Primary
160 }
161
162 pub fn pop(&self) -> Option<T> {
163 self.within_mutable_block(|l| l.pop())
164 }
165
166 pub fn is_empty(&self) -> bool {
167 self.within_mutable_block(|l| Some(l.is_empty())).unwrap_or(true)
168 }
169
170 pub fn len(&self) -> usize {
171 self.within_mutable_block(|l| Some(l.val.len())).unwrap_or_default()
172 }
173
174
175 pub fn write(&self, values:Vec<T>) -> Result<bool,bool> {
176 self.within_mutable_block(|l| {
177 l.write(values);
178 Some(Ok(true))
179 }).unwrap_or(Err(false))
180 }
181
182 pub fn replace(&self, values:Vec<T>) -> Result<bool,bool> {
183 self.within_mutable_block(|l| {
184 l.replace(values);
185 Some(Ok(true))
186 }).unwrap_or(Err(false))
187 }
188
189 pub fn is_write_blocked(&self) -> bool {
190 if let Some(obj) = self.get_ref() {
191 obj.is_write_blocked()
192 } else {
193 true
194 }
195 }
196
197 pub fn steal(&mut self) -> Option<Vec<T>> {
198 match self.rtype {
199 ReadAccessorType::Secondary => {
200 None
201 }
202 ReadAccessorType::Primary => {
203
204 self.within_mutable_block(|l| l.steal())
205 }
206 }
207 }
208
209 pub fn steal_half(&mut self) -> Option<Vec<T>> {
210 match self.rtype {
211 ReadAccessorType::Secondary => {
212 None
213 }
214 ReadAccessorType::Primary => {
215 self.within_mutable_block(|l| l.steal_half())
216 }
217 }
218 }
219
220 pub fn set_state(&mut self, state:State) {
221 self.within_mutable_block(|l| {
222 l.set_state(state);
223 Some(())
224 });
225 }
226
227 pub fn state(&mut self) -> State {
228 self.within_mutable_block(|l| Some(l.get_state())).unwrap_or_default()
229 }
230
231}