use ohos_ffrt_sys::*;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let shared = Arc::new(Shared::new(None));
(
UnboundedSender {
shared: shared.clone(),
},
UnboundedReceiver { shared },
)
}
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared::new(Some(capacity)));
(
Sender {
shared: shared.clone(),
},
Receiver { shared },
)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SendError<T>(pub T);
impl<T> std::fmt::Display for SendError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "receiver dropped")
}
}
impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecvError;
impl std::fmt::Display for RecvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "all senders dropped")
}
}
impl std::error::Error for RecvError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TrySendError<T> {
Full(T),
Disconnected(T),
}
impl<T> std::fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TrySendError::Full(_) => write!(f, "channel full"),
TrySendError::Disconnected(_) => write!(f, "receiver dropped"),
}
}
}
impl<T: std::fmt::Debug> std::error::Error for TrySendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
impl std::fmt::Display for TryRecvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TryRecvError::Empty => write!(f, "channel empty"),
TryRecvError::Disconnected => write!(f, "all senders dropped"),
}
}
}
impl std::error::Error for TryRecvError {}
struct Inner<T> {
queue: VecDeque<T>,
capacity: Option<usize>,
sender_count: usize,
receiver_alive: bool,
recv_waker: Option<Waker>,
send_wakers: VecDeque<Waker>,
}
struct Shared<T> {
mutex: NonNull<ffrt_mutex_t>,
cond: NonNull<ffrt_cond_t>,
data: UnsafeCell<Inner<T>>,
}
impl<T> Shared<T> {
fn new(capacity: Option<usize>) -> Self {
use std::mem::MaybeUninit;
let mut uninit_mutex = Box::new(MaybeUninit::<ffrt_mutex_t>::uninit());
let mut uninit_cond = Box::new(MaybeUninit::<ffrt_cond_t>::uninit());
unsafe {
ffrt_mutex_init(uninit_mutex.as_mut_ptr(), std::ptr::null());
ffrt_cond_init(uninit_cond.as_mut_ptr(), std::ptr::null());
}
let mutex = unsafe { uninit_mutex.assume_init() };
let cond = unsafe { uninit_cond.assume_init() };
Self {
mutex: unsafe { NonNull::new_unchecked(Box::into_raw(mutex)) },
cond: unsafe { NonNull::new_unchecked(Box::into_raw(cond)) },
data: UnsafeCell::new(Inner {
queue: VecDeque::new(),
capacity,
sender_count: 1,
receiver_alive: true,
recv_waker: None,
send_wakers: VecDeque::new(),
}),
}
}
fn lock(&self) -> SharedGuard<'_, T> {
unsafe {
ffrt_mutex_lock(self.mutex.as_ptr());
}
SharedGuard { shared: self }
}
}
struct SharedGuard<'a, T> {
shared: &'a Shared<T>,
}
impl<'a, T> SharedGuard<'a, T> {
fn inner(&self) -> &Inner<T> {
unsafe { &*self.shared.data.get() }
}
fn inner_mut(&mut self) -> &mut Inner<T> {
unsafe { &mut *self.shared.data.get() }
}
fn broadcast(&self) {
unsafe {
ffrt_cond_broadcast(self.shared.cond.as_ptr());
}
}
fn wait(&mut self) {
unsafe {
ffrt_cond_wait(self.shared.cond.as_ptr(), self.shared.mutex.as_ptr());
}
}
}
impl<'a, T> Drop for SharedGuard<'a, T> {
fn drop(&mut self) {
unsafe {
ffrt_mutex_unlock(self.shared.mutex.as_ptr());
}
}
}
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
unsafe {
ffrt_cond_destroy(self.cond.as_ptr());
ffrt_mutex_destroy(self.mutex.as_ptr());
let _ = Box::from_raw(self.mutex.as_ptr());
let _ = Box::from_raw(self.cond.as_ptr());
}
}
}
unsafe impl<T: Send> Send for Shared<T> {}
unsafe impl<T: Send> Sync for Shared<T> {}
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
impl<T> Sender<T> {
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
SendFuture {
shared: self.shared.clone(),
value: Some(value),
}
.await
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
let mut guard = self.shared.lock();
if !guard.inner().receiver_alive {
return Err(TrySendError::Disconnected(value));
}
let capacity = guard.inner().capacity;
if let Some(cap) = capacity {
if guard.inner().queue.len() >= cap {
return Err(TrySendError::Full(value));
}
}
guard.inner_mut().queue.push_back(value);
if let Some(waker) = guard.inner_mut().recv_waker.take() {
waker.wake();
}
guard.broadcast();
Ok(())
}
pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
let mut guard = self.shared.lock();
let mut current_value = Some(value);
loop {
if !guard.inner().receiver_alive {
return Err(SendError(current_value.take().unwrap()));
}
let capacity = guard.inner().capacity;
if let Some(cap) = capacity {
if guard.inner().queue.len() >= cap {
guard.wait();
continue;
}
}
guard
.inner_mut()
.queue
.push_back(current_value.take().unwrap());
if let Some(waker) = guard.inner_mut().recv_waker.take() {
waker.wake();
}
guard.broadcast();
return Ok(());
}
}
pub fn is_closed(&self) -> bool {
let guard = self.shared.lock();
!guard.inner().receiver_alive
}
pub fn len(&self) -> usize {
let guard = self.shared.lock();
guard.inner().queue.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> Option<usize> {
let guard = self.shared.lock();
guard.inner().capacity
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let mut guard = self.shared.lock();
guard.inner_mut().sender_count += 1;
drop(guard);
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut guard = self.shared.lock();
guard.inner_mut().sender_count -= 1;
if guard.inner().sender_count == 0 {
if let Some(waker) = guard.inner_mut().recv_waker.take() {
waker.wake();
}
guard.broadcast();
}
}
}
struct SendFuture<T> {
shared: Arc<Shared<T>>,
value: Option<T>,
}
impl<T> Future for SendFuture<T> {
type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let mut guard = this.shared.lock();
if !guard.inner().receiver_alive {
return Poll::Ready(Err(SendError(this.value.take().unwrap())));
}
let capacity = guard.inner().capacity;
if let Some(cap) = capacity {
if guard.inner().queue.len() >= cap {
guard.inner_mut().send_wakers.push_back(cx.waker().clone());
return Poll::Pending;
}
}
guard
.inner_mut()
.queue
.push_back(this.value.take().unwrap());
if let Some(waker) = guard.inner_mut().recv_waker.take() {
waker.wake();
}
guard.broadcast();
Poll::Ready(Ok(()))
}
}
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
}
impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Option<T> {
RecvFuture {
shared: self.shared.clone(),
}
.await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let mut guard = self.shared.lock();
if let Some(value) = guard.inner_mut().queue.pop_front() {
if let Some(waker) = guard.inner_mut().send_wakers.pop_front() {
waker.wake();
}
guard.broadcast();
Ok(value)
} else if guard.inner().sender_count == 0 {
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
pub fn blocking_recv(&mut self) -> Option<T> {
let mut guard = self.shared.lock();
loop {
if let Some(value) = guard.inner_mut().queue.pop_front() {
if let Some(waker) = guard.inner_mut().send_wakers.pop_front() {
waker.wake();
}
guard.broadcast();
return Some(value);
}
if guard.inner().sender_count == 0 {
return None;
}
guard.wait();
}
}
pub fn close(&mut self) {
let mut guard = self.shared.lock();
guard.inner_mut().receiver_alive = false;
while let Some(waker) = guard.inner_mut().send_wakers.pop_front() {
waker.wake();
}
guard.broadcast();
}
pub fn len(&self) -> usize {
let guard = self.shared.lock();
guard.inner().queue.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut guard = self.shared.lock();
guard.inner_mut().receiver_alive = false;
while let Some(waker) = guard.inner_mut().send_wakers.pop_front() {
waker.wake();
}
guard.broadcast();
}
}
struct RecvFuture<T> {
shared: Arc<Shared<T>>,
}
impl<T> Future for RecvFuture<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = self.shared.lock();
if let Some(value) = guard.inner_mut().queue.pop_front() {
if let Some(waker) = guard.inner_mut().send_wakers.pop_front() {
waker.wake();
}
guard.broadcast();
return Poll::Ready(Some(value));
}
if guard.inner().sender_count == 0 {
return Poll::Ready(None);
}
guard.inner_mut().recv_waker = Some(cx.waker().clone());
Poll::Pending
}
}
pub struct UnboundedSender<T> {
shared: Arc<Shared<T>>,
}
impl<T> UnboundedSender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
let mut guard = self.shared.lock();
if !guard.inner().receiver_alive {
return Err(SendError(value));
}
guard.inner_mut().queue.push_back(value);
if let Some(waker) = guard.inner_mut().recv_waker.take() {
waker.wake();
}
guard.broadcast();
Ok(())
}
pub fn is_closed(&self) -> bool {
let guard = self.shared.lock();
!guard.inner().receiver_alive
}
pub fn len(&self) -> usize {
let guard = self.shared.lock();
guard.inner().queue.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
let mut guard = self.shared.lock();
guard.inner_mut().sender_count += 1;
drop(guard);
UnboundedSender {
shared: self.shared.clone(),
}
}
}
impl<T> Drop for UnboundedSender<T> {
fn drop(&mut self) {
let mut guard = self.shared.lock();
guard.inner_mut().sender_count -= 1;
if guard.inner().sender_count == 0 {
if let Some(waker) = guard.inner_mut().recv_waker.take() {
waker.wake();
}
guard.broadcast();
}
}
}
pub struct UnboundedReceiver<T> {
shared: Arc<Shared<T>>,
}
impl<T> UnboundedReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
RecvFuture {
shared: self.shared.clone(),
}
.await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let mut guard = self.shared.lock();
if let Some(value) = guard.inner_mut().queue.pop_front() {
guard.broadcast();
Ok(value)
} else if guard.inner().sender_count == 0 {
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
pub fn blocking_recv(&mut self) -> Option<T> {
let mut guard = self.shared.lock();
loop {
if let Some(value) = guard.inner_mut().queue.pop_front() {
guard.broadcast();
return Some(value);
}
if guard.inner().sender_count == 0 {
return None;
}
guard.wait();
}
}
pub fn close(&mut self) {
let mut guard = self.shared.lock();
guard.inner_mut().receiver_alive = false;
guard.broadcast();
}
pub fn len(&self) -> usize {
let guard = self.shared.lock();
guard.inner().queue.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> Drop for UnboundedReceiver<T> {
fn drop(&mut self) {
let mut guard = self.shared.lock();
guard.inner_mut().receiver_alive = false;
guard.broadcast();
}
}