hubs/
lib.rs

1//! The Horribly Unsafe Buffer Structure.
2//!
3//! A Data Structure that allows for fast access to pre-allocated data in chunks and allows read-access to all currently comitted chunks in one call.
4//!
5//! This is not a general ourpose data structure, if you attempt it to use it as such, it might yield terrible performance.
6//! This crate was made for slow-ticking game loops, so that one tick every 20ms or so can easily read hundreds of thousands of items with two atomic operations.
7//! Refer to [Hubs] to get started.
8
9
10use std::{cell::UnsafeCell, marker::PhantomData, sync::{Arc, atomic::{ AtomicUsize, Ordering}}};
11
12const HUBS_SIZE: usize = 256;
13const CHUNK_SIZE: usize = 256;
14
15/**
16The Hubs data structure.
17A `Hubs` holds a list of `Chunk`s.
18Each `Chunk` contains an array of values of `T`.
19
20# Usage
21Operation works as follows:
22- You create the Hubs
23- You split it, to receive the [HubsProducer] and [HubsConsumer]. These can be moved to their target threads.
24- You can write on the [HubsProducer] by mutably borrowing single chunks with [`.borrow_chunk_mut()`]
25  - When writing an element of a chunk, it is your responsibility to set [Chunk::used]
26  - You need to call [`.commit()`]: When a chunk is written, it has to be comitted. This has to be done regardnless whether it is full or not. 
27  - There must be no gaps of valid data in the array within a chunk.
28  Upon commit, the chunk can be read by another thread.
29- To read from the Hubs, you can get a [ChunkBlock] that contains all currently committed chunks.
30  - A [ChunkBlock] provides an iterator over all elements in the arrays in all chunks up to `used`.
31
32
33# Example
34```
35    use hubs::{Hubs, HubsInitializer, HubsWriteAccess};
36
37    let hubs = Hubs::new_default();
38    let (mut tx,rx) = hubs.split();
39
40    // In 7 Chunks, write the first 9 elements
41    for i in 0 .. 7{
42        match tx.borrow_chunk_mut(){
43            Some(guard) => {
44                for k in 0 .. 9 {
45                    guard.chunk.data[k] = i*k as u64;
46                    guard.chunk.used += 1;
47                }
48                guard.commit();
49            },
50            None => panic!("Must not be full")
51        };
52    }
53
54    // Now read everything in one go
55    let mut iter = rx.get_chunks_for_tick().into_iter();
56    for i in 0 .. 7{
57        for k in 0 .. 9{
58            assert_eq!(*iter.next().unwrap(), i*k as u64);
59        }
60    } 
61
62    assert_eq!(iter.next(), None);        
63
64```
65
66  [`.commit()`]: `HubsWriteAccess::commit()`
67  [`.borrow_chunk_mut()`]: `HubsProducer::borrow_chunk_mut()`
68
69
70*/
71pub struct Hubs<T>{
72    inner: HubsInner<T>
73}
74impl<T> Hubs<T> where T: Default{
75
76    /// Create a [Hubs] with a [HubsInitializer] if the item type implements [Default]. 
77    /// Upon initialization, all fields will contain the default value.
78    pub fn new_default() -> Self{
79        let initializer = DefaultInitializer::new();
80        let mut chunks = Vec::with_capacity(128);
81        for _i in 0..HUBS_SIZE{
82            chunks.push(Chunk::new(&initializer));
83        }
84        let chunks = chunks.into_boxed_slice();
85
86        Hubs{
87           inner: HubsInner{
88                chunks: UnsafeCell::from(chunks),
89                read_ptr: AtomicUsize::new(0),
90                read_barrier: AtomicUsize::new(HUBS_SIZE-1),
91                write_ptr: AtomicUsize::new(0),
92                write_barrier: AtomicUsize::new(0),
93                capacity: HUBS_SIZE
94            } 
95        }
96    }
97} 
98
99
100
101impl<T> Hubs<T>{
102
103    /**
104    Creates a new [Hubs] with the given initializer.
105    The created [Hubs] has a fixed capacity to hold Chunks, that cannot be changed after creation.
106    After initialization, you can use [`.split()`] to split the Hubs in a consumer and producer.
107    These can be moved to another thread.
108
109      [`.split()`]: `Hubs::split()`
110
111    */
112    pub fn new(initializer: &dyn HubsInitializer<T=T>) -> Self{
113        Hubs::with_capacity(HUBS_SIZE, initializer)
114    }
115
116    /**
117    The same as [`.new()`] but you can define the capacity upon creation.
118
119    [`.new()`]: `Hubs::new()`
120    */
121    pub fn with_capacity(capacity:usize, initializer: &dyn HubsInitializer<T=T>) -> Self{
122
123        let mut chunks = Vec::with_capacity(128);
124        for _i in 0..capacity{
125            chunks.push(Chunk::new(initializer));
126        }
127        let chunks = chunks.into_boxed_slice();
128
129        Hubs{
130           inner: HubsInner{
131                chunks: UnsafeCell::from(chunks),
132                read_ptr: AtomicUsize::new(0),
133                read_barrier: AtomicUsize::new(capacity-1),
134                write_ptr: AtomicUsize::new(0),
135                write_barrier: AtomicUsize::new(0),
136                capacity
137            } 
138        }
139    }
140
141    /**
142        Take Ownership of the hubs and split it.
143        This is necessary to move one part over to another thread.
144    */
145    pub fn split(self) -> (HubsProducer<T>, HubsConsumer<T>){
146        let inner = Arc::new(self.inner);
147        let tx = HubsProducer{
148            inner: Arc::clone(&inner)
149        };
150        let rx = HubsConsumer{
151            inner: Arc::clone(&inner)
152        };
153        (tx,rx)
154    }
155
156    /**
157        Get the fixed capacity of the hubs.
158        This equals the allocated chunks. Since we need a bit of space between the read and write barrier to handle the wrap around case,
159        you can store one chunk less than the capacity before you need to read from the hubs.
160    */
161    pub fn capacity(&self) -> usize{
162        self.inner.capacity
163    }
164}
165
166/**
167Trait used to fill the empty hubs with default data upon creation.
168See [Hubs::new_default] as an alternative
169Do not be afraid to implement this type, since it is used only once upon initalization.
170*/
171pub trait HubsInitializer{
172    type T;
173    fn initialize_data(&self)-> Self::T;
174}
175
176
177/**
178    A Chunk of Data.
179
180    A `Chunk` contains an array of your desired datatype `T`.
181    The variable `used` has to be set to the amount of valid values in `data`.
182    There must be no gaps in `data` but it does not has to be fully used.
183    A chunk bust be comitted to be readable, see [HubsWriteAccess].
184
185    If you did not manually clear the chunk content, there will be stale data in the chunk from the previous round.
186    This is ok, if you set the `used` variable correcly.
187
188
189*/
190pub struct Chunk<T>{
191    /// The capacity of this chunk, do not change
192    pub capacity: usize,
193    /// Count of used elements in this chunk. Set this before you commit
194    pub used: usize,
195    /// The data in this chunk. Feel free to borrow the data to other functions, e.g. a network receive
196    pub data: Box<[T]>
197}
198impl<T> Chunk<T> {
199    fn new(initializer: &dyn HubsInitializer<T=T>) -> Self{
200        let mut v = Vec::with_capacity(CHUNK_SIZE);
201        for _ in 0 .. CHUNK_SIZE{
202            v.push(initializer.initialize_data());
203        }
204        Chunk{
205            capacity: CHUNK_SIZE,
206            used: 0,
207            data: v.into_boxed_slice(),
208        }
209    }
210}
211
212/**
213    The producer side of the Hubs.
214
215    Use this one for writing into the data structure.
216    See [Hubs] for an overview.
217    The Hubs Producer may be moved around threads.
218    Do not try to do this whilst you borrowed a chunk, I haven't tested that.
219*/
220pub struct HubsProducer<T>{
221    inner: Arc<HubsInner<T>>
222}
223
224/**
225    The consumer side of the Hubs.
226
227    Use this one for reading data from the structure.
228    See [Hubs] for an overview.
229    The Hubs Consumer may be moved around threads.
230    Do not try to do this whilst you borrowed a set of chunks, I haven't tested that.
231
232    To get all committed chunks, call [`.get_chunks_for_tick()`].
233
234    You can not get only a part of these chunks.
235    If you do not read all chunks retrieved in one read call, they are lost.
236
237    [`.get_chunks_for_tick()`]: `HubsConsumer::get_chunks_for_tick`
238*/
239pub struct HubsConsumer<T>{
240    inner: Arc<HubsInner<T>>
241}
242
243
244impl<T> HubsConsumer<T>{
245    /**
246        Gives you all currently committed Chunks in a [ChunkBlock].
247        Once given out, it is your responsibility to either process them or allow them to be lost.
248    */
249    pub fn get_chunks_for_tick(&self) -> ChunkBlock<T>{
250        self.inner.get_read_chunks_current()
251    }
252}
253
254impl<T> HubsProducer<T>{
255    /**
256        Borrows a [Chunk] from a Hubs.
257        You must give it back by calling [`.commit()`].
258        There can only be one single chunk given out at any time.
259
260        If the Hubs is full, this returns `None`.
261
262        [`.commit()`]: `HubsWriteAccess::commit()`
263    */
264    pub fn borrow_chunk_mut(&mut self) -> Option<HubsWriteAccess<T>>{
265        self.inner.borrow_chunk_mut()
266    }
267}
268
269unsafe impl<T: Send> Send for HubsInner<T> {}
270unsafe impl<T: Send> Sync for HubsInner<T> {}
271
272struct HubsInner<T>{
273    chunks: UnsafeCell<Box<[Chunk<T>]>>,
274    read_barrier: AtomicUsize,
275    read_ptr: AtomicUsize,
276    write_ptr: AtomicUsize,
277    write_barrier: AtomicUsize,
278    capacity: usize
279}
280
281
282impl<T> HubsInner<T>{
283    fn get_read_chunks_current(&self) -> ChunkBlock<T>{
284
285        let read_end = self.write_barrier.load(Ordering::SeqCst);
286        let read_start = self.read_ptr.load(Ordering::SeqCst);
287
288        if read_start == read_end {
289            return ChunkBlock::empty()
290        }
291        
292        self.read_ptr.store(read_end, Ordering::SeqCst);
293        
294
295        // this is tricky, we need to close the ring
296        let chunks = if read_start > read_end {
297            ChunkBlockData::Two(
298                unsafe{ &(*self.chunks.get())[read_start..] },
299                unsafe{ &(*self.chunks.get())[..read_end] }
300            )
301        }
302        else{
303            ChunkBlockData::One(unsafe{ &(*self.chunks.get())[read_start..read_end] })
304        };
305
306        ChunkBlock::new(chunks, &self)
307    }
308
309    fn borrow_chunk_mut(&self) -> Option<HubsWriteAccess<T>>{
310
311        let write_pos = self.write_ptr.load(Ordering::SeqCst);        
312        let write_barrier = self.write_barrier.load(Ordering::SeqCst);
313
314        if write_pos!=write_barrier {
315            panic!("Cant borrow more than one chunk")
316        }
317
318        let read_barrier = self.read_barrier.load(Ordering::SeqCst);
319
320        if read_barrier == write_pos{
321            return None;
322        }
323
324        let next_write_pos = (write_pos + 1) % self.capacity;
325
326        self.write_ptr.store( next_write_pos, Ordering::SeqCst);
327
328        /*
329         SAFETY:
330
331         */
332        let chunk = unsafe{ &mut(*self.chunks.get())[write_pos] };
333        Some(HubsWriteAccess{
334            chunk,
335            parent: self,
336        })
337    }
338
339    fn return_chunk_block(&self, _block: &mut ChunkBlock<T>){
340
341        let read_end = self.read_barrier.load(Ordering::SeqCst);
342        let mut read_ptr = self.read_ptr.load(Ordering::SeqCst);
343
344        read_ptr =  ( self.capacity + read_ptr - 1 ) % self.capacity;
345
346        if read_ptr == read_end {
347            panic!("Tried to return block to hubs that has no block given out")
348        }
349
350        self.read_barrier.store(read_ptr, Ordering::SeqCst);
351    }
352
353    fn commit_chunk(&self, _write_access: HubsWriteAccess<T>){
354        let write_pos = self.write_ptr.load(Ordering::SeqCst);        
355        let mut write_barrier = self.write_barrier.load(Ordering::SeqCst);
356        write_barrier = (write_barrier + 1) % self.capacity;
357        if write_pos == write_barrier {
358            self.write_barrier.store(write_barrier, Ordering::SeqCst);
359        }
360        else  {
361            panic!("Cant commit old chunk if already borrowed new chunk")
362        }
363    }
364}
365
366
367/// A Block of Chunks
368///
369/// Acess via its Iterator
370pub struct ChunkBlock<'a,T> {
371    chunks: ChunkBlockData<'a, T>,
372    parent: Option<&'a HubsInner<T>>,
373    current_chunk_index: usize,
374    in_chunk_index: usize
375}
376
377impl <'a,T> Iterator for ChunkBlock<'a,T>{
378    type Item = &'a T;
379
380    fn next(&mut self) -> Option<Self::Item> {
381        if let Some(chunk) = self.current_chunk(){
382            if chunk.used > self.in_chunk_index{
383                let data = &chunk.data[self.in_chunk_index];
384                self.in_chunk_index += 1;
385                return Some(data)
386            }
387            else{
388                // the current chunk did not hold more data, let's retry with the next chunk
389                self.current_chunk_index += 1;
390                self.in_chunk_index = 0;
391                if let Some(chunk) = self.current_chunk(){
392                    if chunk.used > self.in_chunk_index{
393                        let data = &chunk.data[self.in_chunk_index];
394                        self.in_chunk_index += 1;
395                        return Some(data)
396                    }
397                }
398            }
399        }
400        None               
401    }    
402}
403
404
405impl <'a,T> ChunkBlock<'a,T>{
406
407    fn new( chunks: ChunkBlockData<'a, T>, parent: &'a HubsInner<T>) -> Self{
408        ChunkBlock{
409            chunks, 
410            parent: Some(parent),
411            current_chunk_index: 0,
412            in_chunk_index: 0
413        }
414
415    }
416
417    fn empty() -> Self{
418        ChunkBlock{
419            chunks: ChunkBlockData::None, 
420            parent: None,
421            current_chunk_index: 0,
422            in_chunk_index: 0
423        }
424
425    }
426
427    fn current_chunk(&self, ) -> Option<&'a Chunk<T>>{
428        let index = self.current_chunk_index;
429        match self.chunks {
430            ChunkBlockData::One(block) => {
431                if block.len() > index{
432                    Some(&block[index])
433                }
434                else{
435                    return None
436                }
437            },
438            ChunkBlockData::Two(block_0, block_1) => {
439                if block_0.len() > index{
440                    Some(&block_0[index])
441                }
442                else if block_1.len() > index - block_0.len(){
443                    let index = index - block_0.len();
444                    Some(&block_1[index])
445                }
446                else{
447                    return None
448                }
449            },
450            ChunkBlockData::None => None
451        }
452    }
453
454}
455
456
457enum ChunkBlockData<'a,T>{
458    One(&'a [Chunk<T>]),
459    Two(&'a [Chunk<T>], &'a [Chunk<T>]),
460    None
461}
462
463
464impl <'a,T> Drop for ChunkBlock<'a,T>{
465    fn drop(&mut self) {
466        match self.parent{
467            Some(parent) =>parent.return_chunk_block(self),
468            None => ()
469        }
470    }
471}
472
473/**
474Some sort of write guard for a single chunk.
475
476You need to call [`.commit()`] when you are done.
477This returns the chunk to the Hubs and will enable it to be read.
478
479You may access the internal data via `chunk`. See the documentation in [Chunk] for more details
480
481[`.commit()`]: `HubsWriteAccess::commit()`
482
483*/
484pub struct HubsWriteAccess<'a,T> {
485    pub chunk: &'a mut Chunk<T>,
486    parent: &'a HubsInner<T>
487}
488
489impl <'a,T> HubsWriteAccess<'a,T>{
490    /**
491    Commits this chunk, making it available to be read and allows the Hubs to give you a new one.
492    - You **need** to call this function if you borrowed a chunk.
493    - You **need** to call this function before you borrow the next chunk.
494    If you fail to do so, the following will happen: Bad things.
495    */
496    pub fn commit(self) {
497        self.parent.commit_chunk(self);
498    }
499}
500
501struct DefaultInitializer<T>{_data: PhantomData<T>}
502impl<T : Default> DefaultInitializer<T>{
503    fn new() -> Self{
504        DefaultInitializer{
505            _data : PhantomData
506        }
507    }
508}
509
510impl<T : Default> HubsInitializer for DefaultInitializer<T>{
511    type T=T;
512    fn initialize_data(&self)-> Self::T {
513        T::default()
514    }
515}