everscale-network 0.5.5

Implementation of the network part of the Everscale blockchain
Documentation
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use crossbeam_queue::SegQueue;
use tokio::sync::Barrier;

pub struct BroadcastReceiver<T> {
    data: SegQueue<T>,
    barriers: SegQueue<Arc<Barrier>>,
    sync_lock: AtomicU32,
}

impl<T: Send + 'static> BroadcastReceiver<T> {
    pub fn data_len(&self) -> usize {
        self.data.len()
    }

    pub fn barriers_len(&self) -> usize {
        self.barriers.len()
    }

    pub fn push(self: &Arc<Self>, data: T) {
        self.data.push(data);
        let receiver = self.clone();
        tokio::spawn(async move {
            while receiver.sync_lock.load(Ordering::Acquire) > 0 {
                if let Some(barrier) = receiver.barriers.pop() {
                    barrier.wait().await;
                    break;
                } else {
                    tokio::task::yield_now().await;
                }
            }
        });
    }

    pub async fn pop(&self) -> T {
        self.sync_lock.fetch_add(1, Ordering::Release);
        loop {
            match self.data.pop() {
                Some(data) => {
                    self.sync_lock.fetch_sub(1, Ordering::Release);
                    return data;
                }
                None => {
                    let barrier = Arc::new(Barrier::new(2));
                    self.barriers.push(barrier.clone());
                    barrier.wait().await;
                }
            }
        }
    }
}

impl<T> Default for BroadcastReceiver<T> {
    fn default() -> Self {
        Self {
            data: Default::default(),
            barriers: Default::default(),
            sync_lock: Default::default(),
        }
    }
}