1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
//! An asynchronously awaitable state broadcasting channel

use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll, Waker};
use std::marker::PhantomData;
use core::pin::Pin;
use lock_api::{RawMutex, Mutex};
use crate::NoopLock;
use crate::intrusive_singly_linked_list::{LinkedList, ListNode};
use super::ChannelSendError;

/// An ID, which allows to differentiate states received from a Channel.
/// Elements with a bigger state ID (`id > otherId`) have been published more
/// recently into the Channel.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd)]
pub struct StateId(u64);

impl StateId {
    /// Returns the initial StateId, which is guaranteed to return the
    /// oldest buffered value available.
    pub fn new() -> Self {
        StateId(0)
    }
}

/// Tracks how the future had interacted with the channel
#[derive(PartialEq, Debug)]
pub enum RecvPollState {
    /// The task is not registered at the wait queue at the channel
    Unregistered,
    /// The task was added to the wait queue at the channel.
    Registered,
}

/// Tracks the channel futures waiting state.
/// Access to this struct is synchronized through the channel.
#[derive(Debug)]
pub struct RecvWaitQueueEntry {
    /// The task handle of the waiting task
    task: Option<Waker>,
    /// Current polling state
    state: RecvPollState,
    /// The minimum state ID we are waiting for
    state_id: StateId,
}

impl RecvWaitQueueEntry {
    /// Creates a new RecvWaitQueueEntry
    pub fn new(state_id: StateId) -> RecvWaitQueueEntry {
        RecvWaitQueueEntry {
            task: None,
            state_id,
            state: RecvPollState::Unregistered,
        }
    }
}

/// Adapter trait that allows Futures to generically interact with Channel
/// implementations via dynamic dispatch.
pub trait ChannelReceiveAccess<T> {
    unsafe fn try_receive(
        &self,
        wait_node: &mut ListNode<RecvWaitQueueEntry>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<(StateId, T)>>;

    fn remove_receive_waiter(&self, wait_node: &mut ListNode<RecvWaitQueueEntry>);
}

/// A Future that is returned by the `receive` function on a state broadcast channel.
/// The future gets resolved with `Some((state_id, state))` when a value could be
/// received from the channel.
///
/// `state` represents the new state which had been retrieved from the channel.
///
/// `state_id` is the [`StateId`] which can be passed as a parameter to
/// `receive()` in order to fetch the next state from the channel.
///
/// If the channels gets closed and no items are still enqueued inside the
/// channel, the future will resolve to `None`.
#[must_use = "futures do nothing unless polled"]
pub struct StateReceiveFuture<'a, MutexType, T>
where T: Clone
{
    /// The channel that is associated with this StateReceiveFuture
    channel: Option<&'a dyn ChannelReceiveAccess<T>>,
    /// Node for waiting on the channel
    wait_node: ListNode<RecvWaitQueueEntry>,
    /// Marker for mutex type
    _phantom: PhantomData<MutexType>,
}

// Safety: Channel futures can be sent between threads as long as the underlying
// channel is thread-safe (Sync), which allows to poll/register/unregister from
// a different thread.
unsafe impl<'a, MutexType: Sync, T: Clone + Send> Send for StateReceiveFuture<'a, MutexType, T> {}

impl<'a, MutexType, T: Clone> core::fmt::Debug for StateReceiveFuture<'a, MutexType, T> {
    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
        f.debug_struct("StateReceiveFuture")
            .finish()
    }
}

impl<'a, MutexType, T: Clone> Future for StateReceiveFuture<'a, MutexType, T>
{
    type Output = Option<(StateId, T)>;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<(StateId, T)>> {
        // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
        // However this didn't seem to work for some borrow checker reasons

        // Safety: The next operations are safe, because Pin promises us that
        // the address of the wait queue entry inside StateReceiveFuture is stable,
        // and we don't move any fields inside the future until it gets dropped.
        let mut_self: &mut StateReceiveFuture<MutexType, T> = unsafe {
            Pin::get_unchecked_mut(self)
        };

        let channel = mut_self.channel.expect("polled StateReceiveFuture after completion");

        let poll_res = unsafe {
            channel.try_receive(&mut mut_self.wait_node, cx)
        };

        if poll_res.is_ready() {
            // A value was available
            mut_self.channel = None;
        }

        poll_res
    }
}

impl<'a, MutexType, T: Clone> FusedFuture for StateReceiveFuture<'a, MutexType, T> {
    fn is_terminated(&self) -> bool {
        self.channel.is_none()
    }
}

impl<'a, MutexType, T: Clone> Drop for StateReceiveFuture<'a, MutexType, T> {
    fn drop(&mut self) {
        // If this StateReceiveFuture has been polled and it was added to the
        // wait queue at the channel, it must be removed before dropping.
        // Otherwise the channel would access invalid memory.
        if let Some(channel) = self.channel {
            channel.remove_receive_waiter(&mut self.wait_node);
        }
    }
}

unsafe fn wake_waiters(mut waiters: LinkedList<RecvWaitQueueEntry>) {
    // Reverse the waiter list, so that the oldest waker (which is
    // at the end of the list), gets woken first and has the best
    // chance to grab the channel value.
    waiters.reverse();

    for waiter in waiters.into_iter() {
        if let Some(handle) = (*waiter).task.take() {
            handle.wake();
        }
        (*waiter).state = RecvPollState::Unregistered;
    }
}

/// Internal state of the state broadcast channel
struct ChannelState<T> {
    /// Whether the channel was actively closed
    is_closed: bool,
    /// The ID of the next state.
    state_id: StateId,
    /// The value which is stored inside the channel
    value: Option<T>,
    /// The list of waiters, which are waiting for the channel to get fulfilled
    waiters: LinkedList<RecvWaitQueueEntry>,
}

impl<T> ChannelState<T>
where T: Clone {
    fn new() -> ChannelState<T> {
        ChannelState::<T> {
            is_closed: false,
            state_id: StateId(0),
            value: None,
            waiters: LinkedList::new(),
        }
    }

    /// Writes a single value to the channel.
    /// If the maximum amount of values had been written, the new value will be rejected.
    fn send(&mut self, value: T) -> Result<(), ChannelSendError<T>> {
        if self.is_closed || self.state_id.0 == core::u64::MAX {
            return Err(ChannelSendError(value));
        }

        self.value = Some(value);
        self.state_id.0 += 1;

        // Wakeup all waiters
        let waiters = self.waiters.take();
        // Safety: The linked list is guaranteed to be only manipulated inside
        // the mutex in scope of the ChannelState and is thereby guaranteed to
        // be consistent.
        unsafe {
            wake_waiters(waiters);
        }

        Ok(())
    }

    fn close(&mut self) {
        if self.is_closed {
            return;
        }
        self.is_closed = true;

        // Wakeup all waiters
        let waiters = self.waiters.take();
        // Safety: The linked list is guaranteed to be only manipulated inside
        // the mutex in scope of the ChannelState and is thereby guaranteed to
        // be consistent.
        unsafe {
            wake_waiters(waiters);
        }
    }

    /// Tries to read the value from the channel.
    /// If the value isn't available yet, the StateReceiveFuture gets added to the
    /// wait queue at the channel, and will be signalled once ready.
    /// This function is only safe as long as the `wait_node`s address is guaranteed
    /// to be stable until it gets removed from the queue.
    unsafe fn try_receive(
        &mut self,
        wait_node: &mut ListNode<RecvWaitQueueEntry>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<(StateId, T)>> {
        match wait_node.state {
            RecvPollState::Unregistered => {
                // The caller must wait for a value if either there is no value
                // available yet, or if the value isn't newer than what the
                // caller requested.
                let val_to_deliver = match &self.value {
                    Some(ref v) if wait_node.state_id < self.state_id => {
                        Some(v.clone())
                    },
                    Some(_) | None => None,
                };

                match val_to_deliver {
                    Some(v) => {
                        // A value that satisfies the caller is available.
                        Poll::Ready(Some((self.state_id, v)))
                    },
                    None => {
                        // Check if something was written into the channel before
                        // or the channel was closed.
                        if self.is_closed {
                            Poll::Ready(None)
                        } else {
                            // Added the task to the wait queue
                            wait_node.task = Some(cx.waker().clone());
                            wait_node.state = RecvPollState::Registered;
                            self.waiters.add_front(wait_node);
                            Poll::Pending
                        }
                    }
                }
            },
            RecvPollState::Registered => {
                // Since the channel wakes up all waiters and moves their states to unregistered
                // there can't be any value in the channel in this state.
                Poll::Pending
            },
        }
    }

    fn remove_waiter(&mut self, wait_node: &mut ListNode<RecvWaitQueueEntry>) {
        // StateReceiveFuture only needs to get removed if it had been added to
        // the wait queue of the channel. This has happened in the RecvPollState::Waiting case.
        if let RecvPollState::Registered = wait_node.state {
            if ! unsafe { self.waiters.remove(wait_node) } {
                // Panic if the address isn't found. This can only happen if the contract was
                // violated, e.g. the RecvWaitQueueEntry got moved after the initial poll.
                panic!("Future could not be removed from wait queue");
            }
            wait_node.state = RecvPollState::Unregistered;
        }
    }
}

/// A channel which can be used to synchronize the state between a sender an
/// arbitrary number of receivers.
///
/// The sender can publish its state.
///
/// The receivers can wait for state updates by announcing the most recent state
/// that is already known to them.
pub struct GenericStateBroadcastChannel<MutexType: RawMutex, T> {
    inner: Mutex<MutexType, ChannelState<T>>,
}

// The channel can be sent to other threads as long as it's not borrowed and the
// value in it can be sent to other threads.
unsafe impl<MutexType: RawMutex + Send, T: Send> Send for GenericStateBroadcastChannel<MutexType, T> {}
// The channel is thread-safe as long as a thread-safe mutex is used
unsafe impl<MutexType: RawMutex + Sync, T: Send> Sync for GenericStateBroadcastChannel<MutexType, T> {}

impl<MutexType: RawMutex, T> core::fmt::Debug for GenericStateBroadcastChannel<MutexType, T> {
    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
        f.debug_struct("GenericStateBroadcastChannel")
            .finish()
    }
}

impl<MutexType: RawMutex, T> GenericStateBroadcastChannel<MutexType, T>
where T: Clone {
    /// Creates a new State Broadcast Channel in the given state
    pub fn new() -> GenericStateBroadcastChannel<MutexType, T>
    where T: Clone {
        GenericStateBroadcastChannel {
            inner: Mutex::new(ChannelState::new()),
        }
    }

    /// Writes a single value to the channel.
    ///
    /// This will notify waiters about the availability of the value.
    /// If the maximum amount of values had been written to the channel,
    /// or if the channel is closed, the new value will be rejected and
    /// returned inside the error variant.
    pub fn send(&self, value: T) -> Result<(), ChannelSendError<T>> {
        self.inner.lock().send(value)
    }

    /// Closes the channel.
    ///
    /// This will notify waiters about closure, by fulfilling pending `Future`s
    /// with `None`.
    /// `send(value)` attempts which follow this call will fail with a
    /// [`ChannelSendError`].
    pub fn close(&self) {
        self.inner.lock().close()
    }

    /// Returns a future that gets fulfilled when a value is written to the channel
    /// or the channel is closed.
    /// `state_id` specifies the minimum state ID that should be retrieved
    /// by the `receive` operation.
    ///
    /// The returned [`StateReceiveFuture`] will get fulfilled with the
    /// retrieved value as well as the [`StateId`] which is required to retrieve
    /// the following state.
    pub fn receive(&self, state_id: StateId) -> StateReceiveFuture<MutexType, T> {
        StateReceiveFuture {
            channel: Some(self),
            wait_node: ListNode::new(RecvWaitQueueEntry::new(state_id)),
            _phantom: PhantomData,
        }
    }
}

impl<MutexType: RawMutex, T: Clone> ChannelReceiveAccess<T> for GenericStateBroadcastChannel<MutexType, T> {
    unsafe fn try_receive(
        &self,
        wait_node: &mut ListNode<RecvWaitQueueEntry>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<(StateId, T)>> {
        self.inner.lock().try_receive(wait_node, cx)
    }

    fn remove_receive_waiter(&self, wait_node: &mut ListNode<RecvWaitQueueEntry>) {
        self.inner.lock().remove_waiter(wait_node)
    }
}

// Export a non thread-safe version using NoopLock

/// A [`GenericStateBroadcastChannel`] which is not thread-safe.
pub type LocalStateBroadcastChannel<T> = GenericStateBroadcastChannel<NoopLock, T>;

#[cfg(feature = "std")]
mod if_std {
    use super::*;

    // Export a thread-safe version using parking_lot::RawMutex

    /// A [`GenericStateBroadcastChannel`] implementation backed by [`parking_lot`].
    pub type StateBroadcastChannel<T> = GenericStateBroadcastChannel<parking_lot::RawMutex, T>;
}

#[cfg(feature = "std")]
pub use self::if_std::*;

// The next section should really integrated if the alloc feature is active,
// since it mainly requires `Arc` to be available. However for simplicity reasons
// it is currently only activated in std environments.
#[cfg(feature = "std")]
mod if_alloc {
    use super::*;

    pub mod shared {
        use super::*;
        use std::sync::atomic::{AtomicUsize, Ordering};

        struct GenericStateBroadcastChannelSharedState<MutexType, T>
        where MutexType: RawMutex, T: Clone + 'static {
            /// The amount of [`GenericSender`] instances which reference this state.
            senders: AtomicUsize,
            /// The amount of [`GenericReceiver`] instances which reference this state.
            receivers: AtomicUsize,
            /// The channel on which is acted.
            channel: GenericStateBroadcastChannel<MutexType, T>,
        }

        // Implement ChannelReceiveAccess trait for SharedChannelState, so that it can
        // be used for dynamic dispatch in futures.
        impl <MutexType, T> ChannelReceiveAccess<T> for GenericStateBroadcastChannelSharedState<MutexType, T>
        where MutexType: RawMutex, T: Clone + 'static
        {
            unsafe fn try_receive(
                &self,
                wait_node: &mut ListNode<RecvWaitQueueEntry>,
                cx: &mut Context<'_>,
            ) -> Poll<Option<(StateId, T)>>
            {
                self.channel.try_receive(wait_node, cx)
            }

            fn remove_receive_waiter(&self, wait_node: &mut ListNode<RecvWaitQueueEntry>)
            {
                self.channel.remove_receive_waiter(wait_node)
            }
        }

        /// A Future that is returned by the `receive` function on a state broadcast channel.
        /// The future gets resolved with `Some((state_id, state))` when a value could be
        /// received from the channel.
        ///
        /// `state` represents the new state which had been retrieved from the channel.
        ///
        /// `state_id` is the [`StateId`] which can be passed as a parameter to
        /// `receive()` in order to fetch the next state from the channel.
        ///
        /// If the channels gets closed and no items are still enqueued inside the
        /// channel, the future will resolve to `None`.
        #[must_use = "futures do nothing unless polled"]
        pub struct StateReceiveFuture<MutexType, T> {
            /// The Channel that is associated with this StateReceiveFuture
            channel: Option<std::sync::Arc<dyn ChannelReceiveAccess<T>>>,
            /// Node for waiting on the channel
            wait_node: ListNode<RecvWaitQueueEntry>,
            /// Marker for mutex type
            _phantom: PhantomData<MutexType>,
        }

        // Safety: Channel futures can be sent between threads as long as the underlying
        // channel is thread-safe (Sync), which allows to poll/register/unregister from
        // a different thread.
        unsafe impl<MutexType: Sync, T: Clone + Send> Send for StateReceiveFuture<MutexType, T> {}

        impl<MutexType, T> core::fmt::Debug for StateReceiveFuture<MutexType, T> {
            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
                f.debug_struct("StateReceiveFuture")
                    .finish()
            }
        }

        impl<MutexType, T> Future for StateReceiveFuture<MutexType, T> {
            type Output = Option<(StateId, T)>;

            fn poll(
                self: Pin<&mut Self>,
                cx: &mut Context<'_>,
            ) -> Poll<Option<(StateId, T)>> {
                // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
                // However this didn't seem to work for some borrow checker reasons

                // Safety: The next operations are safe, because Pin promises us that
                // the address of the wait queue entry inside StateReceiveFuture is stable,
                // and we don't move any fields inside the future until it gets dropped.
                let mut_self: &mut StateReceiveFuture<MutexType, T> = unsafe {
                    Pin::get_unchecked_mut(self)
                };

                let channel = mut_self.channel.take().expect(
                    "polled StateReceiveFuture after completion");

                let poll_res = unsafe {
                    channel.try_receive(&mut mut_self.wait_node, cx)
                };

                if poll_res.is_ready() {
                    // A value was available
                    mut_self.channel = None;
                }
                else {
                    mut_self.channel = Some(channel)
                }

                poll_res
            }
        }

        impl<MutexType, T> FusedFuture for StateReceiveFuture<MutexType, T> {
            fn is_terminated(&self) -> bool {
                self.channel.is_none()
            }
        }

        impl<MutexType, T> Drop for StateReceiveFuture<MutexType, T> {
            fn drop(&mut self) {
                // If this StateReceiveFuture has been polled and it was added to the
                // wait queue at the channel, it must be removed before dropping.
                // Otherwise the channel would access invalid memory.
                if let Some(channel) = &self.channel {
                    channel.remove_receive_waiter(&mut self.wait_node);
                }
            }
        }

        /// The sending side of a channel which can be used to exchange values
        /// between concurrent tasks.
        ///
        /// Values can be sent into the channel through `send`.
        pub struct GenericStateSender<MutexType, T>
        where MutexType: RawMutex, T: Clone + 'static {
            inner: std::sync::Arc<GenericStateBroadcastChannelSharedState<MutexType, T>>,
        }

        /// The receiving side of a channel which can be used to exchange values
        /// between concurrent tasks.
        ///
        /// Tasks can receive values from the channel through the `receive` method.
        /// The returned Future will get resolved when a value is sent into the channel.
        pub struct GenericStateReceiver<MutexType, T>
        where MutexType: RawMutex, T: Clone + 'static {
            inner: std::sync::Arc<GenericStateBroadcastChannelSharedState<MutexType, T>>,
        }

        impl<MutexType, T> core::fmt::Debug for GenericStateSender<MutexType, T>
        where MutexType: RawMutex, T: Clone {
            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
                f.debug_struct("StateSender")
                    .finish()
            }
        }

        impl<MutexType, T> core::fmt::Debug for GenericStateReceiver<MutexType, T>
        where MutexType: RawMutex, T: Clone {
            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
                f.debug_struct("StateReceiver")
                    .finish()
            }
        }

        impl<MutexType, T> Clone for GenericStateSender<MutexType, T>
        where MutexType: RawMutex, T: Clone {
            fn clone(&self) -> Self {
                let old_size = self.inner.senders.fetch_add(1, Ordering::Relaxed);
                if old_size > (core::isize::MAX) as usize {
                    panic!("Reached maximum refcount");
                }
                GenericStateSender {
                    inner: self.inner.clone(),
                }
            }
        }

        impl<MutexType, T> Drop for GenericStateSender<MutexType, T>
        where MutexType: RawMutex, T: Clone {
            fn drop(&mut self) {
                if self.inner.senders.fetch_sub(1, Ordering::Release) != 1 {
                    return;
                }
                std::sync::atomic::fence(Ordering::Acquire);
                // Close the channel, before last sender gets destroyed
                // TODO: We could potentially avoid this, if no receiver is left
                self.inner.channel.close();
            }
        }

        impl<MutexType, T> Clone for GenericStateReceiver<MutexType, T>
        where MutexType: RawMutex, T: Clone {
            fn clone(&self) -> Self {
                let old_size = self.inner.receivers.fetch_add(1, Ordering::Relaxed);
                if old_size > (core::isize::MAX) as usize {
                    panic!("Reached maximum refcount");
                }
                GenericStateReceiver {
                    inner: self.inner.clone(),
                }
            }
        }

        impl<MutexType, T> Drop for GenericStateReceiver<MutexType, T>
        where MutexType: RawMutex, T: Clone {
            fn drop(&mut self) {
                if self.inner.receivers.fetch_sub(1, Ordering::Release) != 1 {
                    return;
                }
                std::sync::atomic::fence(Ordering::Acquire);
                // Close the channel, before last receiver gets destroyed
                // TODO: We could potentially avoid this, if no sender is left
                self.inner.channel.close();
            }
        }

        /// Creates a new state broadcast channel which can be used to exchange values
        /// of type `T` between concurrent tasks.
        /// The ends of the Channel are represented through
        /// the returned Sender and Receiver.
        ///
        /// As soon es either the senders or receivers is closed, the channel
        /// itself will be closed.
        ///
        /// Example for creating a channel to transmit an integer value:
        ///
        /// ```
        /// # use futures_intrusive::channel::shared::state_broadcast_channel;
        /// let (sender, receiver) = state_broadcast_channel::<i32>();
        /// ```
        pub fn generic_state_broadcast_channel<MutexType, T>() ->
            (GenericStateSender<MutexType, T>, GenericStateReceiver<MutexType, T>)
        where MutexType: RawMutex, T: Clone + Send {
            let inner = std::sync::Arc::new(
                GenericStateBroadcastChannelSharedState {
                    channel: GenericStateBroadcastChannel::new(),
                    senders: AtomicUsize::new(1),
                    receivers: AtomicUsize::new(1),
                });

            let sender = GenericStateSender {
                inner: inner.clone(),
            };
            let receiver = GenericStateReceiver {
                inner,
            };

            (sender, receiver)
        }

        impl<MutexType, T> GenericStateSender<MutexType, T>
        where MutexType: RawMutex + 'static, T: Clone {
            /// Writes a single value to the channel.
            ///
            /// This will notify waiters about the availability of the value.
            /// If a value had been written to the channel before, or if the
            /// channel is closed, the new value will be rejected and
            /// returned inside the error variant.
            pub fn send(&self, value: T) -> Result<(), ChannelSendError<T>> {
                self.inner.channel.send(value)
            }
        }

        impl<MutexType, T> GenericStateReceiver<MutexType, T>
        where MutexType: RawMutex + 'static, T: Clone {
            /// Returns a future that gets fulfilled when a value is written to the channel
            /// or the channel is closed.
            /// `state_id` specifies the minimum state ID that should be retrieved
            /// by the `receive` operation.
            ///
            /// The returned [`StateReceiveFuture`] will get fulfilled with the
            /// retrieved value as well as the [`StateId`] which is required to retrieve
            /// the following state
            pub fn receive(&self, state_id: StateId) -> StateReceiveFuture<MutexType, T> {
                StateReceiveFuture {
                    channel: Some(self.inner.clone()),
                    wait_node: ListNode::new(RecvWaitQueueEntry::new(state_id)),
                    _phantom: PhantomData,
                }
            }
        }

        // Export parking_lot based shared channels in std mode
        #[cfg(feature = "std")]
        mod if_std {
            use super::*;

            /// A [`GenericStateSender`] implementation backed by [`parking_lot`].
            pub type StateSender<T> = GenericStateSender<parking_lot::RawMutex, T>;
            /// A [`GenericStateReceiver`] implementation backed by [`parking_lot`].
            pub type StateReceiver<T> = GenericStateReceiver<parking_lot::RawMutex, T>;

            /// Creates a new state broadcast channel.
            ///
            /// Refer to [`generic_state_broadcast_channel`] for details.
            pub fn state_broadcast_channel<T>() -> (StateSender<T>, StateReceiver<T>)
            where T: Clone + Send {
                generic_state_broadcast_channel::<parking_lot::RawMutex, T>()
            }
        }

        #[cfg(feature = "std")]
        pub use self::if_std::*;
    }
}

#[cfg(feature = "std")]
pub use self::if_alloc::*;