use core::convert::Infallible;
use core::pin::Pin;
#[doc(inline)]
pub use veecle_os_runtime_macros::actor;
use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
use crate::datastore::{Slot, generational};
mod sealed {
pub trait Sealed {}
}
pub trait Actor<'a> {
type StoreRequest: StoreRequest<'a>;
type InitContext;
type Error: core::error::Error;
fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
fn run(
self,
) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
}
pub trait StoreRequest<'a>: sealed::Sealed {
#[doc(hidden)]
#[allow(async_fn_in_trait)] async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
}
impl sealed::Sealed for () {}
pub trait Datastore {
fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
#[expect(rustdoc::private_intra_doc_links)] #[expect(private_interfaces)] fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
where
T: Storable + 'static;
}
impl<S> Datastore for Pin<&S>
where
S: Datastore,
{
fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
Pin::into_inner(self).source()
}
#[expect(private_interfaces)] fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
where
T: Storable + 'static,
{
Pin::into_inner(self).slot()
}
}
pub(crate) trait DatastoreExt<'a>: Copy {
#[cfg(test)]
fn increment_generation(self);
fn reader<T>(self) -> Reader<'a, T>
where
T: Storable + 'static;
fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
where
T: Storable + 'static;
fn writer<T>(self) -> Writer<'a, T>
where
T: Storable + 'static;
}
impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
where
S: Datastore,
{
#[cfg(test)]
fn increment_generation(self) {
self.source().increment_generation()
}
fn reader<T>(self) -> Reader<'a, T>
where
T: Storable + 'static,
{
Reader::from_slot(self.slot::<T>())
}
fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
where
T: Storable + 'static,
{
ExclusiveReader::from_slot(self.slot::<T>())
}
fn writer<T>(self) -> Writer<'a, T>
where
T: Storable + 'static,
{
Writer::new(self.source().waiter(), self.slot::<T>())
}
}
impl<'a> StoreRequest<'a> for () {
async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
}
impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
impl<'a, T> StoreRequest<'a> for Reader<'a, T>
where
T: Storable + 'static,
{
async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
datastore.reader()
}
}
impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
where
T: Storable + 'static,
{
async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
datastore.exclusive_reader()
}
}
impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
where
T: Storable + 'static,
{
async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
Reader::from_slot(datastore.slot()).wait_init().await
}
}
impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
impl<'a, T> StoreRequest<'a> for Writer<'a, T>
where
T: Storable + 'static,
{
async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
datastore.writer()
}
}
macro_rules! impl_request_helper {
($t:ident) => {
#[cfg_attr(docsrs, doc(fake_variadic))]
impl<'a, $t> sealed::Sealed for ($t,) { }
#[cfg_attr(docsrs, doc(fake_variadic))]
impl<'a, $t> StoreRequest<'a> for ($t,)
where
$t: StoreRequest<'a>,
{
async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
(<$t as StoreRequest>::request(datastore).await,)
}
}
};
(@impl $($t:ident)*) => {
#[cfg_attr(docsrs, doc(hidden))]
impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
where
$($t: sealed::Sealed),*
{ }
#[cfg_attr(docsrs, doc(hidden))]
impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
where
$($t: StoreRequest<'a>),*
{
async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
futures::join!($( <$t as StoreRequest>::request(datastore), )*)
}
}
};
($head:ident $($rest:ident)*) => {
impl_request_helper!(@impl $head $($rest)*);
impl_request_helper!($($rest)*);
};
}
impl_request_helper!(Z Y X W V U T);
#[diagnostic::on_unimplemented(
message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
label = "not a valid actor return type"
)]
pub trait IsActorResult: sealed::Sealed {
type Error;
fn into_result(self) -> Result<Infallible, Self::Error>;
}
impl<E> sealed::Sealed for Result<Infallible, E> {}
impl<E> IsActorResult for Result<Infallible, E> {
type Error = E;
fn into_result(self) -> Result<Infallible, E> {
self
}
}
impl sealed::Sealed for Infallible {}
impl IsActorResult for Infallible {
type Error = Infallible;
fn into_result(self) -> Result<Infallible, Self::Error> {
match self {}
}
}
#[cfg(test)]
mod tests {
use core::future::Future;
use core::pin::pin;
use core::task::{Context, Poll};
use futures::future::FutureExt;
use crate::actor::{DatastoreExt, StoreRequest};
use crate::cons::{Cons, Nil};
use crate::datastore::{InitializedReader, Storable};
#[test]
fn multi_request_order_independence() {
#[derive(Debug, Storable)]
#[storable(crate = crate)]
struct A;
#[derive(Debug, Storable)]
#[storable(crate = crate)]
struct B;
let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
let mut a_writer = datastore.as_ref().writer::<A>();
let mut b_writer = datastore.as_ref().writer::<B>();
let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
datastore.as_ref()
));
let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
datastore.as_ref()
));
let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
let mut request_1_context = Context::from_waker(&request_1_waker);
let mut request_2_context = Context::from_waker(&request_2_waker);
assert!(matches!(
request_1.as_mut().poll(&mut request_1_context),
Poll::Pending
));
assert!(matches!(
request_2.as_mut().poll(&mut request_2_context),
Poll::Pending
));
let old_request_1_wake_count = request_1_wake_count.get();
let old_request_2_wake_count = request_2_wake_count.get();
datastore.as_ref().increment_generation();
a_writer.write(A).now_or_never().unwrap();
if request_1_wake_count.get() > old_request_1_wake_count {
assert!(matches!(
request_1.as_mut().poll(&mut request_1_context),
Poll::Pending
));
}
if request_2_wake_count.get() > old_request_2_wake_count {
assert!(matches!(
request_2.as_mut().poll(&mut request_2_context),
Poll::Pending
));
}
let old_request_1_wake_count = request_1_wake_count.get();
let old_request_2_wake_count = request_2_wake_count.get();
datastore.as_ref().increment_generation();
b_writer.write(B).now_or_never().unwrap();
assert!(request_1_wake_count.get() > old_request_1_wake_count);
assert!(request_2_wake_count.get() > old_request_2_wake_count);
let Poll::Ready((mut request_1_a, mut request_1_b)) =
request_1.as_mut().poll(&mut request_1_context)
else {
panic!("request 1 was not ready")
};
let Poll::Ready((mut request_2_a, mut request_2_b)) =
request_2.as_mut().poll(&mut request_2_context)
else {
panic!("request 2 was not ready")
};
assert!(request_1_a.wait_for_update().now_or_never().is_some());
assert!(request_1_b.wait_for_update().now_or_never().is_some());
assert!(request_2_a.wait_for_update().now_or_never().is_some());
assert!(request_2_b.wait_for_update().now_or_never().is_some());
}
}