use embassy_sync::{blocking_mutex::raw::RawMutex, channel::Sender};
use static_cell::StaticCell;
use embassy_sync::{
blocking_mutex::raw::NoopRawMutex,
channel::{Channel, DynamicSender, Receiver, TrySendError},
};
use crate::drop::DropBomb;
pub trait Actor: Sized {
type Message;
async fn on_mount<M>(&mut self, _: DynamicAddress<Self::Message>, _: M) -> !
where
M: Inbox<Self::Message>;
}
pub trait Inbox<M> {
#[must_use = "Must set response for message"]
async fn next(&mut self) -> M;
}
impl<'ch, M, MUT, const QUEUE_SIZE: usize> Inbox<M> for Receiver<'ch, MUT, M, QUEUE_SIZE>
where
M: 'ch,
MUT: RawMutex,
{
async fn next(&mut self) -> M {
self.receive().await
}
}
pub trait ActorAddress<M> {
fn try_notify(&self, message: M) -> Result<(), M>;
async fn notify(&self, message: M);
}
pub trait ActorRequest<M, R> {
async fn request(&self, message: M) -> R;
}
impl<M> ActorAddress<M> for DynamicSender<'_, M> {
fn try_notify(&self, message: M) -> Result<(), M> {
self.try_send(message).map_err(|e| match e {
TrySendError::Full(m) => m,
})
}
async fn notify(&self, message: M) {
self.send(message).await
}
}
impl<M, R> ActorRequest<M, R> for DynamicSender<'_, Request<M, R>> {
async fn request(&self, message: M) -> R {
let channel: Channel<NoopRawMutex, R, 1> = Channel::new();
let sender: DynamicSender<'_, R> = channel.sender().into();
let bomb = DropBomb::new();
let reply_to = unsafe {
core::mem::transmute::<
&embassy_sync::channel::DynamicSender<'_, R>,
&embassy_sync::channel::DynamicSender<'_, R>,
>(&sender)
};
let message = Request::new(message, reply_to);
self.notify(message).await;
let res = channel.receive().await;
bomb.defuse();
res
}
}
impl<M, MUT, const N: usize> ActorAddress<M> for Sender<'_, MUT, M, N>
where
MUT: RawMutex,
{
fn try_notify(&self, message: M) -> Result<(), M> {
self.try_send(message).map_err(|e| match e {
TrySendError::Full(m) => m,
})
}
async fn notify(&self, message: M) {
self.send(message).await
}
}
impl<M, R, MUT, const N: usize> ActorRequest<M, R> for Sender<'_, MUT, Request<M, R>, N>
where
M: 'static,
MUT: RawMutex + 'static,
{
async fn request(&self, message: M) -> R {
let channel: Channel<MUT, R, 1> = Channel::new();
let sender: DynamicSender<'_, R> = channel.sender().into();
let bomb = DropBomb::new();
let reply_to = unsafe {
core::mem::transmute::<
&embassy_sync::channel::DynamicSender<'_, R>,
&embassy_sync::channel::DynamicSender<'_, R>,
>(&sender)
};
let message = Request::new(message, reply_to);
self.notify(message).await;
let res = channel.receive().await;
bomb.defuse();
res
}
}
pub type DynamicAddress<M> = DynamicSender<'static, M>;
pub type DynamicRequestAddress<M, R> = DynamicSender<'static, Request<M, R>>;
pub type Address<M, MUT, const N: usize = 1> = Sender<'static, MUT, M, N>;
pub type RequestAddress<M, R, MUT, const N: usize = 1> = Sender<'static, MUT, Request<M, R>, N>;
pub struct Request<M, R>
where
R: 'static,
{
message: Option<M>,
reply_to: &'static DynamicSender<'static, R>,
}
unsafe impl<M, R> Send for Request<M, R> {}
impl<M, R> Request<M, R> {
fn new(message: M, reply_to: &'static DynamicSender<'static, R>) -> Self {
Self {
message: Some(message),
reply_to,
}
}
pub async fn process<F: FnOnce(M) -> R>(mut self, f: F) {
let reply = f(self.message.take().unwrap());
self.reply_to.send(reply).await;
}
pub async fn reply(self, value: R) {
self.reply_to.send(value).await
}
pub fn get(&self) -> &M {
self.message.as_ref().unwrap()
}
pub fn get_mut(&mut self) -> &mut M {
self.message.as_mut().unwrap()
}
}
impl<M, R> AsRef<M> for Request<M, R> {
fn as_ref(&self) -> &M {
self.message.as_ref().unwrap()
}
}
impl<M, R> AsMut<M> for Request<M, R> {
fn as_mut(&mut self) -> &mut M {
self.message.as_mut().unwrap()
}
}
pub struct ActorContext<A, MUT = NoopRawMutex, const QUEUE_SIZE: usize = 1>
where
A: Actor + 'static,
MUT: RawMutex + 'static,
{
actor: StaticCell<A>,
channel: Channel<MUT, A::Message, QUEUE_SIZE>,
}
unsafe impl<A, MUT, const QUEUE_SIZE: usize> Sync for ActorContext<A, MUT, QUEUE_SIZE>
where
A: Actor,
MUT: RawMutex,
{
}
impl<A, MUT, const QUEUE_SIZE: usize> Default for ActorContext<A, MUT, QUEUE_SIZE>
where
A: Actor,
MUT: RawMutex,
{
fn default() -> Self {
Self::new()
}
}
impl<A, MUT, const QUEUE_SIZE: usize> ActorContext<A, MUT, QUEUE_SIZE>
where
A: Actor,
MUT: RawMutex,
{
pub const fn new() -> Self {
Self {
actor: StaticCell::new(),
channel: Channel::new(),
}
}
pub async fn mount(&'static self, actor: A) -> ! {
let actor = self.actor.init(actor);
let address = self.channel.sender().into();
let receiver = self.channel.receiver();
actor.on_mount(address, receiver).await
}
pub fn dyn_address(&'static self) -> DynamicAddress<A::Message> {
self.channel.sender().into()
}
pub fn address(&'static self) -> Address<A::Message, MUT, QUEUE_SIZE> {
self.channel.sender()
}
}