completeq_rs/channel/
sender.rs1use std::{
2 cell::Cell,
3 future::Future,
4 sync::{Arc, Mutex},
5 task::Poll,
6};
7
8use crate::{
9 result::{EmitInnerResult, EmitResult},
10 user_event::UserEvent,
11};
12
13use super::inner::CompleteQImpl;
14
15pub struct EventSender<E: UserEvent> {
17 event_id: E::ID,
18 inner: Arc<Mutex<CompleteQImpl<E>>>,
19}
20
21impl<E: UserEvent> EventSender<E> {
22 pub(crate) fn new(event_id: E::ID, inner: Arc<Mutex<CompleteQImpl<E>>>) -> Self {
23 Self { event_id, inner }
24 }
25
26 pub fn send(&self, event_arg: E::Argument) -> EventSend<E> {
27 EventSend {
28 argument: Cell::new(Some(event_arg)),
29 event_id: self.event_id.clone(),
30 inner: self.inner.clone(),
31 }
32 }
33}
34
35pub struct EventSend<E: UserEvent> {
37 argument: Cell<Option<E::Argument>>,
39 event_id: E::ID,
40 inner: Arc<Mutex<CompleteQImpl<E>>>,
41}
42
43impl<E: UserEvent> EventSend<E> {
44 pub(crate) fn new(
45 event_id: E::ID,
46 event_arg: E::Argument,
47 inner: Arc<Mutex<CompleteQImpl<E>>>,
48 ) -> Self {
49 Self {
50 argument: Cell::new(Some(event_arg)),
51 event_id: event_id,
52 inner: inner,
53 }
54 }
55}
56
57impl<E: UserEvent> Future for EventSend<E> {
58 type Output = EmitResult;
59
60 fn poll(
61 self: std::pin::Pin<&mut Self>,
62 cx: &mut std::task::Context<'_>,
63 ) -> std::task::Poll<Self::Output> {
64 let argument = self.argument.take();
65 let result = self.inner.lock().unwrap().complete_one(
66 self.event_id.clone(),
67 argument.unwrap(),
68 cx.waker().clone(),
69 );
70
71 match result {
72 EmitInnerResult::Completed => Poll::Ready(EmitResult::Completed),
73 EmitInnerResult::Closed => Poll::Ready(EmitResult::Closed),
74 EmitInnerResult::Pending(argument) => {
75 self.argument.set(Some(argument));
76 return Poll::Pending;
77 }
78 }
79 }
80}