key_message_channel/
sender.rs

1//! send side of the `unbounded` channel.
2
3use crate::{shared::Shared, Message};
4use std::sync::{atomic::Ordering, Arc};
5
6/// Sender
7#[derive(Debug)]
8pub struct Sender<T> {
9    /// reference to the shared area
10    inner: Arc<Shared<T>>,
11}
12
13/// sendError
14#[derive(Debug)]
15pub struct SendError<T>(Option<T>);
16
17impl<T> Sender<T> {
18    /// create a new `Sender`
19    pub(crate) fn new(inner: Arc<Shared<T>>) -> Self {
20        Self { inner }
21    }
22    /// # Errors
23    ///
24    /// Will return `SendError` if receiver is dropped.
25    #[inline]
26    pub fn send(&self, keys: Vec<String>, value: T) -> Result<(), SendError<T>> {
27        if self.inner.recv_dropped() {
28            return Err(SendError(Some(value)));
29        }
30
31        let msg = Arc::new(Message::new(keys, value));
32
33        let _push_result = self.inner.queue.push(msg);
34
35        Ok(())
36    }
37}
38
39impl<T> Clone for Sender<T> {
40    #[inline]
41    fn clone(&self) -> Self {
42        let _ = self.inner.send_count.fetch_add(1, Ordering::SeqCst);
43        Self {
44            inner: Arc::clone(&self.inner),
45        }
46    }
47}
48
49impl<T> Drop for Sender<T> {
50    #[inline]
51    fn drop(&mut self) {
52        let prev = self.inner.send_count.fetch_sub(1, Ordering::SeqCst);
53        if prev == 1 {
54            // 防止 Receiver 泄露
55            let _ = self.inner.condition_available.notify_all();
56        }
57    }
58}