key-message-channel 0.1.0

Multi-producer single-consumer queue capable of queuing messages by message key.
Documentation
//! send side of the `unbounded` channel.

use crate::{shared::Shared, Message};
use std::sync::{atomic::Ordering, Arc};

/// Sender
#[derive(Debug)]
pub struct Sender<T> {
    /// reference to the shared area
    inner: Arc<Shared<T>>,
}

/// sendError
#[derive(Debug)]
pub struct SendError<T>(Option<T>);

impl<T> Sender<T> {
    /// create a new `Sender`
    pub(crate) fn new(inner: Arc<Shared<T>>) -> Self {
        Self { inner }
    }
    /// # Errors
    ///
    /// Will return `SendError` if receiver is dropped.
    #[inline]
    pub fn send(&self, keys: Vec<String>, value: T) -> Result<(), SendError<T>> {
        if self.inner.recv_dropped() {
            return Err(SendError(Some(value)));
        }

        let msg = Arc::new(Message::new(keys, value));

        let _push_result = self.inner.queue.push(msg);

        Ok(())
    }
}

impl<T> Clone for Sender<T> {
    #[inline]
    fn clone(&self) -> Self {
        let _ = self.inner.send_count.fetch_add(1, Ordering::SeqCst);
        Self {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl<T> Drop for Sender<T> {
    #[inline]
    fn drop(&mut self) {
        let prev = self.inner.send_count.fetch_sub(1, Ordering::SeqCst);
        if prev == 1 {
            // 防止 Receiver 泄露
            let _ = self.inner.condition_available.notify_all();
        }
    }
}