use std::future::Future;
use std::time::Duration;
use tokio::sync::oneshot;
use serde::Deserialize;
use serde::Serialize;
use crate::CallError;
use crate::Dest;
use crate::Dests;
use crate::ExitReason;
use crate::From;
use crate::GenServerOptions;
use crate::Message;
use crate::Pid;
use crate::Process;
use crate::Receivable;
use crate::Reference;
use crate::SystemMessage;
#[derive(Debug, Serialize, Deserialize)]
enum GenServerMessage<T: Send + 'static> {
#[serde(rename = "$gen_cast")]
Cast(T),
#[serde(rename = "$gen_call")]
Call(From, T),
#[serde(rename = "$gen_reply")]
CallReply(Reference, T),
#[serde(rename = "$gen_stop")]
Stop(ExitReason),
}
pub trait GenServer: Sized + Send + 'static {
type Message: Receivable;
fn init(&mut self) -> impl Future<Output = Result<(), ExitReason>> + Send;
fn start(
self,
options: GenServerOptions,
) -> impl Future<Output = Result<Pid, ExitReason>> + Send {
async { start_gen_server(self, options, false).await }
}
fn start_link(
self,
options: GenServerOptions,
) -> impl Future<Output = Result<Pid, ExitReason>> + Send {
async { start_gen_server(self, options, true).await }
}
fn stop<T: Into<Dest>>(
server: T,
reason: ExitReason,
timeout: Option<Duration>,
) -> impl Future<Output = Result<(), ExitReason>> {
async move {
let server = server.into();
let monitor = Process::monitor(server.clone());
Process::send(
server,
GenServerMessage::<Self::Message>::Stop(reason.clone()),
);
let receiver = Process::receiver()
.for_message::<GenServerMessage<Self::Message>>()
.select(|message| matches!(message, Message::System(SystemMessage::ProcessDown(_, tag, _)) if *tag == monitor));
let result = match timeout {
Some(duration) => Process::timeout(duration, receiver).await,
None => Ok(receiver.await),
};
match result {
Ok(Message::System(SystemMessage::ProcessDown(_, _, exit_reason))) => {
if reason == exit_reason {
Ok(())
} else {
Err(exit_reason)
}
}
Err(_) => {
Process::demonitor(monitor);
Err(ExitReason::from("timeout"))
}
_ => unreachable!(),
}
}
}
fn cast<T: Into<Dests>>(servers: T, message: Self::Message) {
Process::send(servers, GenServerMessage::Cast(message));
}
fn cast_after<T: Into<Dests>>(
servers: T,
message: Self::Message,
duration: Duration,
) -> Reference {
Process::send_after(servers, GenServerMessage::Cast(message), duration)
}
fn call<T: Into<Dest>>(
server: T,
message: Self::Message,
timeout: Option<Duration>,
) -> impl Future<Output = Result<Self::Message, CallError>> + Send {
let server = server.into();
async move {
let monitor = if server.is_local() {
Process::monitor(server.clone())
} else {
Process::monitor_alias(server.clone(), true)
};
let from = From::new(Process::current(), monitor, server.is_remote());
Process::send(server, GenServerMessage::Call(from, message));
let receiver = Process::receiver()
.for_message::<GenServerMessage<Self::Message>>()
.select(|message| {
match message {
Message::User(GenServerMessage::CallReply(tag, _)) => {
*tag == monitor
}
Message::System(SystemMessage::ProcessDown(_, tag, _)) => {
*tag == monitor
}
_ => false,
}
});
let result =
Process::timeout(timeout.unwrap_or(Duration::from_millis(5000)), receiver).await;
match result {
Ok(Message::User(GenServerMessage::CallReply(_, message))) => {
Process::demonitor(monitor);
Ok(message)
}
Ok(Message::System(SystemMessage::ProcessDown(_, _, reason))) => {
Err(CallError::ServerDown(reason))
}
Err(timeout) => {
Process::demonitor(monitor);
Process::receiver()
.for_message::<GenServerMessage<Self::Message>>()
.remove(|message| matches!(message, Message::User(GenServerMessage::CallReply(tag, _)) if *tag == monitor));
Err(CallError::Timeout(timeout))
}
_ => unreachable!(),
}
}
}
fn reply(from: From, message: Self::Message) {
if from.is_alias() {
Process::send(from.tag(), GenServerMessage::CallReply(from.tag(), message));
} else {
Process::send(from.pid(), GenServerMessage::CallReply(from.tag(), message));
}
}
fn terminate(&mut self, reason: ExitReason) -> impl Future<Output = ()> + Send {
async move {
let _ = reason;
}
}
fn handle_cast(
&mut self,
message: Self::Message,
) -> impl Future<Output = Result<(), ExitReason>> + Send {
async move {
let _ = message;
unimplemented!();
}
}
fn handle_info(
&mut self,
info: Message<Self::Message>,
) -> impl Future<Output = Result<(), ExitReason>> + Send {
async move {
let _ = info;
Ok(())
}
}
fn handle_call(
&mut self,
message: Self::Message,
from: From,
) -> impl Future<Output = Result<Option<Self::Message>, ExitReason>> + Send {
async move {
let _ = message;
let _ = from;
unimplemented!();
}
}
}
async fn start_gen_server<T: GenServer>(
gen_server: T,
options: GenServerOptions,
link: bool,
) -> Result<Pid, ExitReason> {
let (tx, rx) = oneshot::channel::<Result<(), ExitReason>>();
let parent: Option<Pid> = link.then(Process::current);
let server = async move {
let mut gen_server = gen_server;
let mut options = options;
let parent = parent.unwrap_or(Process::current());
let registered = if let Some(name) = options.name.take() {
Process::register(Process::current(), name).is_ok()
} else {
true
};
if !registered {
tx.send(Err(ExitReason::from("already_started")))
.expect("Failed to notify parent process!");
return;
}
let timeout = if let Some(duration) = options.timeout.take() {
Process::timeout(duration, gen_server.init()).await
} else {
Ok(gen_server.init().await)
};
match timeout {
Ok(Ok(())) => {
tx.send(Ok(())).expect("Failed to notify parent process!");
}
Ok(Err(reason)) => {
tx.send(Err(reason.clone()))
.expect("Failed to notify parent process!");
return Process::exit(Process::current(), reason);
}
Err(_) => {
tx.send(Err(ExitReason::from("timeout")))
.expect("Failed to notify parent process!");
return Process::exit(Process::current(), ExitReason::from("timeout"));
}
}
loop {
let message: Message<GenServerMessage<T::Message>> = Process::receive().await;
match message {
Message::User(GenServerMessage::Cast(message)) => {
if let Err(reason) = gen_server.handle_cast(message).await {
gen_server.terminate(reason.clone()).await;
return Process::exit(Process::current(), reason);
}
}
Message::User(GenServerMessage::Call(from, message)) => {
match gen_server.handle_call(message, from).await {
Ok(Some(message)) => {
T::reply(from, message);
}
Ok(None) => {
}
Err(reason) => {
gen_server.terminate(reason.clone()).await;
return Process::exit(Process::current(), reason);
}
}
}
Message::User(GenServerMessage::CallReply(_, message)) => {
if let Err(reason) = gen_server.handle_info(Message::User(message)).await {
gen_server.terminate(reason.clone()).await;
return Process::exit(Process::current(), reason);
}
}
Message::User(GenServerMessage::Stop(reason)) => {
gen_server.terminate(reason.clone()).await;
return Process::exit(Process::current(), reason);
}
Message::System(system) => match system {
SystemMessage::Exit(epid, reason) if epid == parent => {
gen_server.terminate(reason.clone()).await;
return Process::exit(Process::current(), reason);
}
_ => {
if let Err(reason) = gen_server.handle_info(Message::System(system)).await {
gen_server.terminate(reason.clone()).await;
return Process::exit(Process::current(), reason);
}
}
},
}
}
};
let pid = if link {
Process::spawn_link(server)
} else {
Process::spawn(server)
};
rx.await
.map_err(|_| ExitReason::from("unknown"))?
.map(|_| pid)
}