completeq_rs/channel/
sender.rs

1use std::{
2    cell::Cell,
3    future::Future,
4    sync::{Arc, Mutex},
5    task::Poll,
6};
7
8use crate::{
9    result::{EmitInnerResult, EmitResult},
10    user_event::UserEvent,
11};
12
13use super::inner::CompleteQImpl;
14
15/// Event sender endpoint, creating by [`super::EventReceiver`] instance.
16pub struct EventSender<E: UserEvent> {
17    event_id: E::ID,
18    inner: Arc<Mutex<CompleteQImpl<E>>>,
19}
20
21impl<E: UserEvent> EventSender<E> {
22    pub(crate) fn new(event_id: E::ID, inner: Arc<Mutex<CompleteQImpl<E>>>) -> Self {
23        Self { event_id, inner }
24    }
25
26    pub fn send(&self, event_arg: E::Argument) -> EventSend<E> {
27        EventSend {
28            argument: Cell::new(Some(event_arg)),
29            event_id: self.event_id.clone(),
30            inner: self.inner.clone(),
31        }
32    }
33}
34
35/// create by [send](EventSender::send) method
36pub struct EventSend<E: UserEvent> {
37    // Using [`Cell`] to modify data in std::pin::Pin<&mut Self>
38    argument: Cell<Option<E::Argument>>,
39    event_id: E::ID,
40    inner: Arc<Mutex<CompleteQImpl<E>>>,
41}
42
43impl<E: UserEvent> EventSend<E> {
44    pub(crate) fn new(
45        event_id: E::ID,
46        event_arg: E::Argument,
47        inner: Arc<Mutex<CompleteQImpl<E>>>,
48    ) -> Self {
49        Self {
50            argument: Cell::new(Some(event_arg)),
51            event_id: event_id,
52            inner: inner,
53        }
54    }
55}
56
57impl<E: UserEvent> Future for EventSend<E> {
58    type Output = EmitResult;
59
60    fn poll(
61        self: std::pin::Pin<&mut Self>,
62        cx: &mut std::task::Context<'_>,
63    ) -> std::task::Poll<Self::Output> {
64        let argument = self.argument.take();
65        let result = self.inner.lock().unwrap().complete_one(
66            self.event_id.clone(),
67            argument.unwrap(),
68            cx.waker().clone(),
69        );
70
71        match result {
72            EmitInnerResult::Completed => Poll::Ready(EmitResult::Completed),
73            EmitInnerResult::Closed => Poll::Ready(EmitResult::Closed),
74            EmitInnerResult::Pending(argument) => {
75                self.argument.set(Some(argument));
76                return Poll::Pending;
77            }
78        }
79    }
80}