use crate::{router::Router, tokio::runtime::Runtime};
use core::future::Future;
use ockam_core::{
compat::sync::{Arc, Weak},
Result,
};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
#[cfg(feature = "metrics")]
use core::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "std")]
use opentelemetry::trace::FutureExt;
use ockam_core::flow_control::FlowControls;
#[cfg(feature = "std")]
use ockam_core::{
errcode::{Kind, Origin},
Error,
};
pub struct Executor {
runtime: Arc<Runtime>,
router: Arc<Router>,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,
}
impl Executor {
pub fn new(runtime: Arc<Runtime>, flow_controls: &FlowControls) -> Self {
let router = Arc::new(Router::new(flow_controls));
#[cfg(feature = "metrics")]
let metrics = Metrics::new(runtime.handle().clone(), router.get_metrics_readout());
Self {
runtime,
router,
#[cfg(feature = "metrics")]
metrics,
}
}
pub(crate) fn router(&self) -> Weak<Router> {
Arc::downgrade(&self.router)
}
pub fn get_runtime(&self) -> Arc<Runtime> {
self.runtime.clone()
}
#[cfg(feature = "std")]
pub fn execute<F, T, E>(&mut self, future: F) -> Result<F::Output>
where
F: Future<Output = core::result::Result<T, E>> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
#[cfg(feature = "metrics")]
let alive = Arc::new(AtomicBool::from(true));
#[cfg(feature = "metrics")]
self.metrics.clone().spawn(alive.clone());
let future = Executor::wrapper(self.router.clone(), future);
let join_body = self.runtime.spawn(future.with_current_context());
#[cfg(feature = "metrics")]
alive.fetch_or(true, Ordering::Acquire);
let res = self
.runtime
.block_on(join_body)
.map_err(|e| Error::new(Origin::Executor, Kind::Unknown, e))?;
Ok(res)
}
#[cfg(feature = "std")]
async fn wrapper<F, T, E>(router: Arc<Router>, future: F) -> core::result::Result<T, E>
where
F: Future<Output = core::result::Result<T, E>> + Send + 'static,
{
match future.await {
Ok(val) => {
debug!("Wait for router termination...");
router.wait_termination().await;
debug!("Router terminated successfully!...");
Ok(val)
}
Err(e) => {
if let Err(error) = router.shutdown_graceful(1).await {
error!("Failed to stop gracefully: {}", error);
}
Err(e)
}
}
}
#[cfg(not(feature = "std"))]
pub fn execute<F>(&mut self, future: F) -> Result<()>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let _join = self.runtime.spawn(future);
let router = self.router.clone();
crate::tokio::runtime::execute(&self.runtime, async move {
router.wait_termination().await;
});
Ok(())
}
}