hubs/lib.rs
1//! The Horribly Unsafe Buffer Structure.
2//!
3//! A Data Structure that allows for fast access to pre-allocated data in chunks and allows read-access to all currently comitted chunks in one call.
4//!
5//! This is not a general ourpose data structure, if you attempt it to use it as such, it might yield terrible performance.
6//! This crate was made for slow-ticking game loops, so that one tick every 20ms or so can easily read hundreds of thousands of items with two atomic operations.
7//! Refer to [Hubs] to get started.
8
9
10use std::{cell::UnsafeCell, marker::PhantomData, sync::{Arc, atomic::{ AtomicUsize, Ordering}}};
11
12const HUBS_SIZE: usize = 256;
13const CHUNK_SIZE: usize = 256;
14
15/**
16The Hubs data structure.
17A `Hubs` holds a list of `Chunk`s.
18Each `Chunk` contains an array of values of `T`.
19
20# Usage
21Operation works as follows:
22- You create the Hubs
23- You split it, to receive the [HubsProducer] and [HubsConsumer]. These can be moved to their target threads.
24- You can write on the [HubsProducer] by mutably borrowing single chunks with [`.borrow_chunk_mut()`]
25 - When writing an element of a chunk, it is your responsibility to set [Chunk::used]
26 - You need to call [`.commit()`]: When a chunk is written, it has to be comitted. This has to be done regardnless whether it is full or not.
27 - There must be no gaps of valid data in the array within a chunk.
28 Upon commit, the chunk can be read by another thread.
29- To read from the Hubs, you can get a [ChunkBlock] that contains all currently committed chunks.
30 - A [ChunkBlock] provides an iterator over all elements in the arrays in all chunks up to `used`.
31
32
33# Example
34```
35 use hubs::{Hubs, HubsInitializer, HubsWriteAccess};
36
37 let hubs = Hubs::new_default();
38 let (mut tx,rx) = hubs.split();
39
40 // In 7 Chunks, write the first 9 elements
41 for i in 0 .. 7{
42 match tx.borrow_chunk_mut(){
43 Some(guard) => {
44 for k in 0 .. 9 {
45 guard.chunk.data[k] = i*k as u64;
46 guard.chunk.used += 1;
47 }
48 guard.commit();
49 },
50 None => panic!("Must not be full")
51 };
52 }
53
54 // Now read everything in one go
55 let mut iter = rx.get_chunks_for_tick().into_iter();
56 for i in 0 .. 7{
57 for k in 0 .. 9{
58 assert_eq!(*iter.next().unwrap(), i*k as u64);
59 }
60 }
61
62 assert_eq!(iter.next(), None);
63
64```
65
66 [`.commit()`]: `HubsWriteAccess::commit()`
67 [`.borrow_chunk_mut()`]: `HubsProducer::borrow_chunk_mut()`
68
69
70*/
71pub struct Hubs<T>{
72 inner: HubsInner<T>
73}
74impl<T> Hubs<T> where T: Default{
75
76 /// Create a [Hubs] with a [HubsInitializer] if the item type implements [Default].
77 /// Upon initialization, all fields will contain the default value.
78 pub fn new_default() -> Self{
79 let initializer = DefaultInitializer::new();
80 let mut chunks = Vec::with_capacity(128);
81 for _i in 0..HUBS_SIZE{
82 chunks.push(Chunk::new(&initializer));
83 }
84 let chunks = chunks.into_boxed_slice();
85
86 Hubs{
87 inner: HubsInner{
88 chunks: UnsafeCell::from(chunks),
89 read_ptr: AtomicUsize::new(0),
90 read_barrier: AtomicUsize::new(HUBS_SIZE-1),
91 write_ptr: AtomicUsize::new(0),
92 write_barrier: AtomicUsize::new(0),
93 capacity: HUBS_SIZE
94 }
95 }
96 }
97}
98
99
100
101impl<T> Hubs<T>{
102
103 /**
104 Creates a new [Hubs] with the given initializer.
105 The created [Hubs] has a fixed capacity to hold Chunks, that cannot be changed after creation.
106 After initialization, you can use [`.split()`] to split the Hubs in a consumer and producer.
107 These can be moved to another thread.
108
109 [`.split()`]: `Hubs::split()`
110
111 */
112 pub fn new(initializer: &dyn HubsInitializer<T=T>) -> Self{
113 Hubs::with_capacity(HUBS_SIZE, initializer)
114 }
115
116 /**
117 The same as [`.new()`] but you can define the capacity upon creation.
118
119 [`.new()`]: `Hubs::new()`
120 */
121 pub fn with_capacity(capacity:usize, initializer: &dyn HubsInitializer<T=T>) -> Self{
122
123 let mut chunks = Vec::with_capacity(128);
124 for _i in 0..capacity{
125 chunks.push(Chunk::new(initializer));
126 }
127 let chunks = chunks.into_boxed_slice();
128
129 Hubs{
130 inner: HubsInner{
131 chunks: UnsafeCell::from(chunks),
132 read_ptr: AtomicUsize::new(0),
133 read_barrier: AtomicUsize::new(capacity-1),
134 write_ptr: AtomicUsize::new(0),
135 write_barrier: AtomicUsize::new(0),
136 capacity
137 }
138 }
139 }
140
141 /**
142 Take Ownership of the hubs and split it.
143 This is necessary to move one part over to another thread.
144 */
145 pub fn split(self) -> (HubsProducer<T>, HubsConsumer<T>){
146 let inner = Arc::new(self.inner);
147 let tx = HubsProducer{
148 inner: Arc::clone(&inner)
149 };
150 let rx = HubsConsumer{
151 inner: Arc::clone(&inner)
152 };
153 (tx,rx)
154 }
155
156 /**
157 Get the fixed capacity of the hubs.
158 This equals the allocated chunks. Since we need a bit of space between the read and write barrier to handle the wrap around case,
159 you can store one chunk less than the capacity before you need to read from the hubs.
160 */
161 pub fn capacity(&self) -> usize{
162 self.inner.capacity
163 }
164}
165
166/**
167Trait used to fill the empty hubs with default data upon creation.
168See [Hubs::new_default] as an alternative
169Do not be afraid to implement this type, since it is used only once upon initalization.
170*/
171pub trait HubsInitializer{
172 type T;
173 fn initialize_data(&self)-> Self::T;
174}
175
176
177/**
178 A Chunk of Data.
179
180 A `Chunk` contains an array of your desired datatype `T`.
181 The variable `used` has to be set to the amount of valid values in `data`.
182 There must be no gaps in `data` but it does not has to be fully used.
183 A chunk bust be comitted to be readable, see [HubsWriteAccess].
184
185 If you did not manually clear the chunk content, there will be stale data in the chunk from the previous round.
186 This is ok, if you set the `used` variable correcly.
187
188
189*/
190pub struct Chunk<T>{
191 /// The capacity of this chunk, do not change
192 pub capacity: usize,
193 /// Count of used elements in this chunk. Set this before you commit
194 pub used: usize,
195 /// The data in this chunk. Feel free to borrow the data to other functions, e.g. a network receive
196 pub data: Box<[T]>
197}
198impl<T> Chunk<T> {
199 fn new(initializer: &dyn HubsInitializer<T=T>) -> Self{
200 let mut v = Vec::with_capacity(CHUNK_SIZE);
201 for _ in 0 .. CHUNK_SIZE{
202 v.push(initializer.initialize_data());
203 }
204 Chunk{
205 capacity: CHUNK_SIZE,
206 used: 0,
207 data: v.into_boxed_slice(),
208 }
209 }
210}
211
212/**
213 The producer side of the Hubs.
214
215 Use this one for writing into the data structure.
216 See [Hubs] for an overview.
217 The Hubs Producer may be moved around threads.
218 Do not try to do this whilst you borrowed a chunk, I haven't tested that.
219*/
220pub struct HubsProducer<T>{
221 inner: Arc<HubsInner<T>>
222}
223
224/**
225 The consumer side of the Hubs.
226
227 Use this one for reading data from the structure.
228 See [Hubs] for an overview.
229 The Hubs Consumer may be moved around threads.
230 Do not try to do this whilst you borrowed a set of chunks, I haven't tested that.
231
232 To get all committed chunks, call [`.get_chunks_for_tick()`].
233
234 You can not get only a part of these chunks.
235 If you do not read all chunks retrieved in one read call, they are lost.
236
237 [`.get_chunks_for_tick()`]: `HubsConsumer::get_chunks_for_tick`
238*/
239pub struct HubsConsumer<T>{
240 inner: Arc<HubsInner<T>>
241}
242
243
244impl<T> HubsConsumer<T>{
245 /**
246 Gives you all currently committed Chunks in a [ChunkBlock].
247 Once given out, it is your responsibility to either process them or allow them to be lost.
248 */
249 pub fn get_chunks_for_tick(&self) -> ChunkBlock<T>{
250 self.inner.get_read_chunks_current()
251 }
252}
253
254impl<T> HubsProducer<T>{
255 /**
256 Borrows a [Chunk] from a Hubs.
257 You must give it back by calling [`.commit()`].
258 There can only be one single chunk given out at any time.
259
260 If the Hubs is full, this returns `None`.
261
262 [`.commit()`]: `HubsWriteAccess::commit()`
263 */
264 pub fn borrow_chunk_mut(&mut self) -> Option<HubsWriteAccess<T>>{
265 self.inner.borrow_chunk_mut()
266 }
267}
268
269unsafe impl<T: Send> Send for HubsInner<T> {}
270unsafe impl<T: Send> Sync for HubsInner<T> {}
271
272struct HubsInner<T>{
273 chunks: UnsafeCell<Box<[Chunk<T>]>>,
274 read_barrier: AtomicUsize,
275 read_ptr: AtomicUsize,
276 write_ptr: AtomicUsize,
277 write_barrier: AtomicUsize,
278 capacity: usize
279}
280
281
282impl<T> HubsInner<T>{
283 fn get_read_chunks_current(&self) -> ChunkBlock<T>{
284
285 let read_end = self.write_barrier.load(Ordering::SeqCst);
286 let read_start = self.read_ptr.load(Ordering::SeqCst);
287
288 if read_start == read_end {
289 return ChunkBlock::empty()
290 }
291
292 self.read_ptr.store(read_end, Ordering::SeqCst);
293
294
295 // this is tricky, we need to close the ring
296 let chunks = if read_start > read_end {
297 ChunkBlockData::Two(
298 unsafe{ &(*self.chunks.get())[read_start..] },
299 unsafe{ &(*self.chunks.get())[..read_end] }
300 )
301 }
302 else{
303 ChunkBlockData::One(unsafe{ &(*self.chunks.get())[read_start..read_end] })
304 };
305
306 ChunkBlock::new(chunks, &self)
307 }
308
309 fn borrow_chunk_mut(&self) -> Option<HubsWriteAccess<T>>{
310
311 let write_pos = self.write_ptr.load(Ordering::SeqCst);
312 let write_barrier = self.write_barrier.load(Ordering::SeqCst);
313
314 if write_pos!=write_barrier {
315 panic!("Cant borrow more than one chunk")
316 }
317
318 let read_barrier = self.read_barrier.load(Ordering::SeqCst);
319
320 if read_barrier == write_pos{
321 return None;
322 }
323
324 let next_write_pos = (write_pos + 1) % self.capacity;
325
326 self.write_ptr.store( next_write_pos, Ordering::SeqCst);
327
328 /*
329 SAFETY:
330
331 */
332 let chunk = unsafe{ &mut(*self.chunks.get())[write_pos] };
333 Some(HubsWriteAccess{
334 chunk,
335 parent: self,
336 })
337 }
338
339 fn return_chunk_block(&self, _block: &mut ChunkBlock<T>){
340
341 let read_end = self.read_barrier.load(Ordering::SeqCst);
342 let mut read_ptr = self.read_ptr.load(Ordering::SeqCst);
343
344 read_ptr = ( self.capacity + read_ptr - 1 ) % self.capacity;
345
346 if read_ptr == read_end {
347 panic!("Tried to return block to hubs that has no block given out")
348 }
349
350 self.read_barrier.store(read_ptr, Ordering::SeqCst);
351 }
352
353 fn commit_chunk(&self, _write_access: HubsWriteAccess<T>){
354 let write_pos = self.write_ptr.load(Ordering::SeqCst);
355 let mut write_barrier = self.write_barrier.load(Ordering::SeqCst);
356 write_barrier = (write_barrier + 1) % self.capacity;
357 if write_pos == write_barrier {
358 self.write_barrier.store(write_barrier, Ordering::SeqCst);
359 }
360 else {
361 panic!("Cant commit old chunk if already borrowed new chunk")
362 }
363 }
364}
365
366
367/// A Block of Chunks
368///
369/// Acess via its Iterator
370pub struct ChunkBlock<'a,T> {
371 chunks: ChunkBlockData<'a, T>,
372 parent: Option<&'a HubsInner<T>>,
373 current_chunk_index: usize,
374 in_chunk_index: usize
375}
376
377impl <'a,T> Iterator for ChunkBlock<'a,T>{
378 type Item = &'a T;
379
380 fn next(&mut self) -> Option<Self::Item> {
381 if let Some(chunk) = self.current_chunk(){
382 if chunk.used > self.in_chunk_index{
383 let data = &chunk.data[self.in_chunk_index];
384 self.in_chunk_index += 1;
385 return Some(data)
386 }
387 else{
388 // the current chunk did not hold more data, let's retry with the next chunk
389 self.current_chunk_index += 1;
390 self.in_chunk_index = 0;
391 if let Some(chunk) = self.current_chunk(){
392 if chunk.used > self.in_chunk_index{
393 let data = &chunk.data[self.in_chunk_index];
394 self.in_chunk_index += 1;
395 return Some(data)
396 }
397 }
398 }
399 }
400 None
401 }
402}
403
404
405impl <'a,T> ChunkBlock<'a,T>{
406
407 fn new( chunks: ChunkBlockData<'a, T>, parent: &'a HubsInner<T>) -> Self{
408 ChunkBlock{
409 chunks,
410 parent: Some(parent),
411 current_chunk_index: 0,
412 in_chunk_index: 0
413 }
414
415 }
416
417 fn empty() -> Self{
418 ChunkBlock{
419 chunks: ChunkBlockData::None,
420 parent: None,
421 current_chunk_index: 0,
422 in_chunk_index: 0
423 }
424
425 }
426
427 fn current_chunk(&self, ) -> Option<&'a Chunk<T>>{
428 let index = self.current_chunk_index;
429 match self.chunks {
430 ChunkBlockData::One(block) => {
431 if block.len() > index{
432 Some(&block[index])
433 }
434 else{
435 return None
436 }
437 },
438 ChunkBlockData::Two(block_0, block_1) => {
439 if block_0.len() > index{
440 Some(&block_0[index])
441 }
442 else if block_1.len() > index - block_0.len(){
443 let index = index - block_0.len();
444 Some(&block_1[index])
445 }
446 else{
447 return None
448 }
449 },
450 ChunkBlockData::None => None
451 }
452 }
453
454}
455
456
457enum ChunkBlockData<'a,T>{
458 One(&'a [Chunk<T>]),
459 Two(&'a [Chunk<T>], &'a [Chunk<T>]),
460 None
461}
462
463
464impl <'a,T> Drop for ChunkBlock<'a,T>{
465 fn drop(&mut self) {
466 match self.parent{
467 Some(parent) =>parent.return_chunk_block(self),
468 None => ()
469 }
470 }
471}
472
473/**
474Some sort of write guard for a single chunk.
475
476You need to call [`.commit()`] when you are done.
477This returns the chunk to the Hubs and will enable it to be read.
478
479You may access the internal data via `chunk`. See the documentation in [Chunk] for more details
480
481[`.commit()`]: `HubsWriteAccess::commit()`
482
483*/
484pub struct HubsWriteAccess<'a,T> {
485 pub chunk: &'a mut Chunk<T>,
486 parent: &'a HubsInner<T>
487}
488
489impl <'a,T> HubsWriteAccess<'a,T>{
490 /**
491 Commits this chunk, making it available to be read and allows the Hubs to give you a new one.
492 - You **need** to call this function if you borrowed a chunk.
493 - You **need** to call this function before you borrow the next chunk.
494 If you fail to do so, the following will happen: Bad things.
495 */
496 pub fn commit(self) {
497 self.parent.commit_chunk(self);
498 }
499}
500
501struct DefaultInitializer<T>{_data: PhantomData<T>}
502impl<T : Default> DefaultInitializer<T>{
503 fn new() -> Self{
504 DefaultInitializer{
505 _data : PhantomData
506 }
507 }
508}
509
510impl<T : Default> HubsInitializer for DefaultInitializer<T>{
511 type T=T;
512 fn initialize_data(&self)-> Self::T {
513 T::default()
514 }
515}