parallel_task/accessors/
read_accessor.rs

1//! Accessors consist of a primary and a secondary accessor. When creating a limited access queue it never gives direct access
2//! to the queue itself. But it gives access to the primary and secondary accessors. These may then be moved in to separate
3//! threads to then manage the access to the queue. Primary has the additional ability to steal. Secondary cannot steal.
4//! If primary pushes a new set of tasks and changes the status, the secondary can pull the same on command.
5//! The reason for just two accessors is to create a synchronised atomics based management of queue across threads. The accessors
6//! are inherently fast compared to channels and do not engage Locks.
7//! The accessors cannot be cloned.
8
9use std::{ops::{Deref, DerefMut}, sync::{atomic::AtomicPtr, Arc}};
10
11use crate::{accessors::limit_queue::LimitAccessQueue};
12
13
14/// Adds a primary and secondary accessor to easily differentiate the read accessors during usage
15/// and at the time of creation. These are just covers on the read accessors as they implement deref and deref mut to enable
16/// the access to read accessor functions.
17macro_rules! readaccessorref {
18    ($($RdAc:ident),*) => {
19        $(  
20            pub struct $RdAc<T,State>(ReadAccessor<T,State>);
21
22            impl<T,State> $RdAc<T,State> {
23                pub fn new(obj:ReadAccessor<T,State>) -> Self {
24                    Self(obj)
25                }
26            } 
27
28            impl<T,State> Deref for $RdAc<T,State> {
29                type Target = ReadAccessor<T,State>;
30
31                fn deref(&self) -> &Self::Target {
32                    &self.0
33                }
34            }
35
36            impl<T,State> DerefMut for $RdAc<T,State> {    
37            
38                fn deref_mut(&mut self) -> &mut Self::Target {
39                    &mut self.0
40                }
41            }
42        )*        
43    };
44}
45
46readaccessorref!(PrimaryAccessor, SecondaryAccessor);
47
48
49/// QueuePtr is being used to account for the heap memory allocation for the Queue (which is captured within an Atomic Pointer) a
50/// and to ensure the same is dropped at the end, when the drop is called.
51/// QueuePtr is purposefully kept private and inaccessible. Using Deref and DerefMut the access to the same is disguised
52/// within the ReaderAccessors.
53struct QueuePtr<T,State> {
54    ptr:Arc<AtomicPtr<LimitAccessQueue<T,State>>>,
55    #[allow(dead_code)]
56    owner:Arc<LimitAccessQueue<T,State>>
57}
58
59impl<T,State> QueuePtr<T,State> {
60    fn new(ptr:Arc<AtomicPtr<LimitAccessQueue<T,State>>>, owner:Arc<LimitAccessQueue<T,State>>) -> Self {
61        Self {
62            ptr,
63            owner
64        }
65    }
66}
67
68impl<T,State> Deref for QueuePtr<T,State> {
69    type Target = Arc<AtomicPtr<LimitAccessQueue<T,State>>>;
70
71    fn deref(&self) -> &Self::Target {
72        &self.ptr
73    }
74}
75
76impl<T,State> DerefMut for QueuePtr<T,State> {    
77    
78    fn deref_mut(&mut self) -> &mut Self::Target {
79        &mut self.ptr
80    }
81}
82
83impl<T,State> Drop for QueuePtr<T,State> {
84    fn drop(&mut self) {
85
86        self.ptr.store(std::ptr::null_mut(), std::sync::atomic::Ordering::Release);
87    }
88}
89
90#[derive(PartialEq)]
91pub enum ReadAccessorType {
92    Primary,
93    Secondary
94}
95
96pub struct ReadAccessor<T,State> 
97{
98    val: QueuePtr<T,State>,
99    rtype: ReadAccessorType,
100}
101
102#[allow(dead_code)]
103impl<T,State> ReadAccessor<T,State> 
104where State: Clone + Default
105{
106    pub fn new(owner:Arc<LimitAccessQueue<T,State>>, rtype:ReadAccessorType) -> Self {
107        let arc_ptr = Arc::as_ptr(&owner) as *mut LimitAccessQueue<T,State>;
108        let obj = Arc::new(AtomicPtr::new(arc_ptr));
109        let val = QueuePtr::new(obj, owner);
110        Self {
111            val,
112            rtype
113        }
114    }
115
116    //Checks if pointer is null before sharing the same within an Option
117    fn as_ptr(&self) -> Option<*mut LimitAccessQueue<T,State>> {        
118        let ptr = self.val.load(std::sync::atomic::Ordering::Acquire);
119        if ptr.is_null() {
120            None
121        } else {
122            Some(ptr)                                  
123        }           
124    }
125
126    fn get_ref(&self) -> Option<&LimitAccessQueue<T,State>> {
127        unsafe {  
128            if let Some(ptr_ref) = self.as_ptr() {
129                let opt_ptr = (ptr_ref).as_ref();
130                if let Some(ptr) = opt_ptr {
131                    return Some(ptr);
132                }                
133            }          
134            None
135        }       
136    }
137
138    fn get_mut(&self) -> Option<&mut LimitAccessQueue<T,State>> {
139        unsafe {  
140            if let Some(ptr_ref) = self.as_ptr() {
141                (ptr_ref).as_mut()                    
142            }  else {
143                None
144            }                    
145        }       
146    }
147
148    fn within_mutable_block<F,Output>(&self,f:F) -> Option<Output>
149    where F: FnOnce(&mut LimitAccessQueue<T,State>) -> Option<Output> {
150
151        if let Some(ptr) = self.get_mut() {
152            f(ptr)                       
153        } else {
154            None
155        }        
156    }       
157
158    pub fn is_primary(&self) -> bool {
159        self.rtype == ReadAccessorType::Primary
160    }
161
162    pub fn pop(&self) -> Option<T> {  
163        self.within_mutable_block(|l| l.pop())                                                                     
164    }
165
166    pub fn is_empty(&self) -> bool {
167        self.within_mutable_block(|l| Some(l.is_empty())).unwrap_or(true)
168    }
169
170    pub fn len(&self) -> usize {
171        self.within_mutable_block(|l| Some(l.val.len())).unwrap_or_default()        
172    }
173
174
175    pub fn write(&self, values:Vec<T>) -> Result<bool,bool> {
176        self.within_mutable_block(|l| {
177            l.write(values);
178            Some(Ok(true))
179        }).unwrap_or(Err(false))               
180    }
181        
182    pub fn replace(&self, values:Vec<T>) -> Result<bool,bool> { 
183        self.within_mutable_block(|l| {
184            l.replace(values);
185            Some(Ok(true))
186        }).unwrap_or(Err(false))                   
187    }
188
189    pub fn is_write_blocked(&self) -> bool {
190        if let Some(obj) = self.get_ref() {
191            obj.is_write_blocked()
192        } else {
193            true
194        }
195    }
196
197    pub fn steal(&mut self) -> Option<Vec<T>> {
198        match self.rtype {
199            ReadAccessorType::Secondary => {
200                None
201            }
202            ReadAccessorType::Primary => { 
203                
204                self.within_mutable_block(|l| l.steal())               
205            }
206        }
207    }
208
209    pub fn steal_half(&mut self) -> Option<Vec<T>> {
210        match self.rtype {
211            ReadAccessorType::Secondary => {
212                None
213            }
214            ReadAccessorType::Primary => { 
215                self.within_mutable_block(|l| l.steal_half())               
216            }
217        }
218    }
219
220    pub fn set_state(&mut self, state:State) {
221        self.within_mutable_block(|l| {
222            l.set_state(state);
223            Some(())
224        });
225    }
226
227    pub fn state(&mut self) -> State {
228        self.within_mutable_block(|l| Some(l.get_state())).unwrap_or_default()        
229    }
230    
231}