use std::{cell::UnsafeCell, collections::HashMap, fmt::Debug, rc::Rc, sync::Arc};
use futures_channel::{
mpsc::Receiver,
oneshot::{channel as ochannel, Receiver as OReceiver, Sender as OSender},
};
use futures_util::stream::StreamExt;
use monoio::io::stream::Stream;
use service_async::{AsyncMakeService, Service};
use tracing::error;
use super::serve;
use crate::AnyError;
pub struct ServiceExecutor<S> {
sites: Rc<UnsafeCell<HashMap<Arc<String>, ServiceDeploymentContainer<S>>>>,
}
impl<S> Default for ServiceExecutor<S> {
fn default() -> Self {
Self {
sites: Rc::new(UnsafeCell::new(HashMap::new())),
}
}
}
enum ServiceCommandError {
SiteLookupFailed,
ServiceNotStaged,
ServiceNotDeployed,
}
impl<S> ServiceExecutor<S> {
fn get_svc(&self, name: &Arc<String>) -> Option<Rc<S>> {
let sites = unsafe { &*self.sites.get() };
sites.get(name).and_then(|s| s.get_svc())
}
fn precommit_svc(&self, name: Arc<String>, svc: S) {
let sites = unsafe { &mut *self.sites.get() };
let sh = sites
.entry(name)
.or_insert_with(ServiceDeploymentContainer::new);
let precom_svc_slot = unsafe { &mut *sh.precommitted_service.get() };
*precom_svc_slot = Some(svc);
}
fn update_with_precommitted_svc(&self, name: &Arc<String>) -> Result<(), ServiceCommandError> {
let sites = unsafe { &mut *self.sites.get() };
let sh = sites
.get_mut(name)
.ok_or(ServiceCommandError::SiteLookupFailed)?;
let hdr = sh
.committed_service
.as_mut()
.ok_or(ServiceCommandError::ServiceNotDeployed)?;
let precom_svc_slot = unsafe { &mut *sh.precommitted_service.get() };
let precom_svc = precom_svc_slot
.take()
.ok_or(ServiceCommandError::ServiceNotStaged)?;
hdr.slot.update_svc(Rc::new(precom_svc));
Ok(())
}
fn deploy_staged_service(
&self,
name: &Arc<String>,
) -> Result<(ServiceSlot<S>, OSender<()>), ServiceCommandError> {
let sites = unsafe { &mut *self.sites.get() };
let sh = sites
.get_mut(name)
.ok_or(ServiceCommandError::SiteLookupFailed)?;
let precom_svc_slot = unsafe { &mut *sh.precommitted_service.get() };
let precom_svc = precom_svc_slot
.take()
.ok_or(ServiceCommandError::ServiceNotStaged)?;
let (new_site, stop) = ServiceSlotContainer::create(precom_svc);
let handler_slot = new_site.slot.clone();
sh.committed_service = Some(new_site);
Ok((handler_slot, stop))
}
fn remove(&self, name: &Arc<String>) -> Result<(), ServiceCommandError> {
let sites = unsafe { &mut *self.sites.get() };
if sites.remove(name).is_none() {
Err(ServiceCommandError::SiteLookupFailed)
} else {
Ok(())
}
}
fn abort(&self, name: &Arc<String>) -> Result<(), ServiceCommandError> {
let sites = unsafe { &mut *self.sites.get() };
let sh = sites
.get_mut(name)
.ok_or(ServiceCommandError::SiteLookupFailed)?;
let precom_svc_slot = unsafe { &mut *sh.precommitted_service.get() };
*precom_svc_slot = None;
Ok(())
}
}
pub struct ServiceDeploymentContainer<S> {
committed_service: Option<ServiceSlotContainer<S>>,
precommitted_service: UnsafeCell<Option<S>>,
}
struct ServiceSlotContainer<S> {
slot: ServiceSlot<S>,
_stop: OReceiver<()>,
}
impl<S> ServiceDeploymentContainer<S> {
const fn new() -> Self {
Self {
committed_service: None,
precommitted_service: UnsafeCell::new(None),
}
}
fn get_svc(&self) -> Option<Rc<S>> {
self.committed_service.as_ref().map(|h| h.slot.get_svc())
}
}
impl<S> ServiceSlotContainer<S> {
fn create(handler: S) -> (Self, OSender<()>) {
let (tx, rx) = ochannel();
(
Self {
slot: ServiceSlot::from(Rc::new(handler)),
_stop: rx,
},
tx,
)
}
}
pub struct ServiceSlot<S>(Rc<UnsafeCell<Rc<S>>>);
impl<S> Clone for ServiceSlot<S> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<S> From<Rc<S>> for ServiceSlot<S> {
fn from(value: Rc<S>) -> Self {
Self(Rc::new(UnsafeCell::new(value)))
}
}
impl<S> From<Rc<UnsafeCell<Rc<S>>>> for ServiceSlot<S> {
fn from(value: Rc<UnsafeCell<Rc<S>>>) -> Self {
Self(value)
}
}
impl<S> ServiceSlot<S> {
pub fn update_svc(&self, shared_svc: Rc<S>) {
unsafe { *self.0.get() = shared_svc };
}
pub fn get_svc(&self) -> Rc<S> {
unsafe { &*self.0.get() }.clone()
}
}
#[allow(dead_code)]
#[derive(Clone)]
pub enum ServiceCommand<F, LF> {
Precommit(Arc<String>, F),
Update(Arc<String>),
Commit(Arc<String>, LF),
PrepareAndCommit(Arc<String>, F, LF),
Abort(Arc<String>),
Remove(Arc<String>),
}
#[derive(thiserror::Error, Debug)]
pub enum CommandError<SE, LE> {
#[error("build service error: {0:?}")]
BuildService(SE),
#[error("build listener error: {0:?}")]
BuildListener(LE),
#[error("site not exist")]
SiteNotExist,
#[error("preparation not exist")]
PreparationNotExist,
#[error("previous handler not exist")]
PreviousHandlerNotExist,
}
impl<SE, LE> From<ServiceCommandError> for CommandError<SE, LE> {
fn from(value: ServiceCommandError) -> Self {
match value {
ServiceCommandError::SiteLookupFailed => Self::SiteNotExist,
ServiceCommandError::ServiceNotStaged => Self::PreparationNotExist,
ServiceCommandError::ServiceNotDeployed => Self::PreviousHandlerNotExist,
}
}
}
pub struct ServiceCommandTask<F, LF> {
cmd: ServiceCommand<F, LF>,
result: OSender<Result<(), AnyError>>,
}
impl<F, LF> ServiceCommandTask<F, LF> {
pub fn new(cmd: ServiceCommand<F, LF>) -> (Self, OReceiver<Result<(), AnyError>>) {
let (tx, rx) = ochannel();
(Self { cmd, result: tx }, rx)
}
}
pub trait Execute<A, S> {
type Error: Into<AnyError>;
fn execute(
self,
controller: &ServiceExecutor<S>,
) -> impl std::future::Future<Output = Result<(), Self::Error>>;
}
impl<F, LF, A, E, S> Execute<A, S> for ServiceCommand<F, LF>
where
F: AsyncMakeService<Service = S>,
F::Error: Debug + Send + Sync + 'static,
LF: AsyncMakeService,
LF::Service: Stream<Item = Result<A, E>> + 'static,
E: Debug + Send + Sync + 'static,
LF::Error: Debug + Send + Sync + 'static,
S: Service<A> + 'static,
S::Error: Debug,
A: 'static,
{
type Error = CommandError<F::Error, LF::Error>;
async fn execute(self, controller: &ServiceExecutor<S>) -> Result<(), Self::Error> {
match self {
ServiceCommand::Precommit(name, factory) => {
let current_svc = controller.get_svc(&name);
let svc = factory
.make_via_ref(current_svc.as_deref())
.await
.map_err(CommandError::BuildService)?;
controller.precommit_svc(name, svc);
Ok(())
}
ServiceCommand::Update(name) => {
controller.update_with_precommitted_svc(&name)?;
Ok(())
}
ServiceCommand::Commit(name, listener_factory) => {
let listener = listener_factory
.make()
.await
.map_err(CommandError::BuildListener)?;
let (hdr, stop) = controller.deploy_staged_service(&name)?;
monoio::spawn(serve(listener, hdr, stop));
Ok(())
}
ServiceCommand::PrepareAndCommit(name, factory, listener_factory) => {
let svc = factory.make().await.map_err(CommandError::BuildService)?;
let listener = listener_factory
.make()
.await
.map_err(CommandError::BuildListener)?;
controller.precommit_svc(name.clone(), svc);
let (hdr, stop) = controller.deploy_staged_service(&name)?;
monoio::spawn(serve(listener, hdr, stop));
Ok(())
}
ServiceCommand::Abort(name) => {
controller.abort(&name)?;
Ok(())
}
ServiceCommand::Remove(name) => {
controller.remove(&name)?;
Ok(())
}
}
}
}
impl<S> ServiceExecutor<S> {
pub async fn run<F, LF, A>(&self, mut rx: Receiver<ServiceCommandTask<F, LF>>)
where
ServiceCommand<F, LF>: Execute<A, S>,
{
while let Some(upd) = rx.next().await {
if let Err(e) = upd
.result
.send(upd.cmd.execute(self).await.map_err(Into::into))
{
error!("unable to send back result: {e:?}");
}
}
}
}