use async_lock::Mutex;
#[cfg(not(target_arch = "wasm32"))]
use axum::Router;
use futures::prelude::*;
use std::fmt;
use std::sync::Arc;
use crate::runtime;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::ControlPort;
use crate::runtime::Error;
use crate::runtime::Flowgraph;
use crate::runtime::FlowgraphHandle;
use crate::runtime::FlowgraphId;
use crate::runtime::FlowgraphMessage;
use crate::runtime::FlowgraphTask;
use crate::runtime::RunningFlowgraph;
use crate::runtime::channel::mpsc::channel;
use crate::runtime::channel::oneshot;
use crate::runtime::config;
use crate::runtime::scheduler::Scheduler;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::scheduler::SmolScheduler;
use crate::runtime::scheduler::Task;
#[cfg(target_arch = "wasm32")]
use crate::runtime::scheduler::WasmMainScheduler;
#[cfg(not(target_arch = "wasm32"))]
pub type DefaultScheduler = SmolScheduler;
#[cfg(target_arch = "wasm32")]
pub type DefaultScheduler = WasmMainScheduler;
pub struct Runtime<S = DefaultScheduler> {
scheduler: S,
flowgraphs: Arc<Mutex<Vec<FlowgraphHandle>>>,
#[cfg(not(target_arch = "wasm32"))]
_control_port: ControlPort<S>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Runtime<DefaultScheduler> {
pub fn new() -> Self {
Self::with_custom_routes(Router::new())
}
pub fn with_custom_routes(routes: Router) -> Self {
Self::with_config(DefaultScheduler::default(), routes)
}
}
impl<S> Drop for Runtime<S> {
fn drop(&mut self) {
debug!("Runtime dropped");
}
}
#[cfg(target_arch = "wasm32")]
impl Runtime<DefaultScheduler> {
pub fn new() -> Self {
Self::with_scheduler(DefaultScheduler::default())
}
}
impl Default for Runtime<DefaultScheduler> {
fn default() -> Self {
Self::new()
}
}
impl<S: Scheduler> Runtime<S> {
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
self.scheduler.spawn(future)
}
pub fn spawn_background<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) {
self.scheduler.spawn(future).detach();
}
pub async fn start_async(&self, fg: Flowgraph) -> Result<RunningFlowgraph, Error> {
let running = start_flowgraph(self.scheduler.clone(), fg).await?;
self.flowgraphs.lock().await.push(running.handle());
Ok(running)
}
pub async fn run_async(&self, fg: Flowgraph) -> Result<Flowgraph, Error> {
self.start_async(fg).await?.wait_async().await
}
pub fn scheduler(&self) -> &S {
&self.scheduler
}
pub fn handle(&self) -> RuntimeHandle<S> {
RuntimeHandle {
scheduler: self.scheduler.clone(),
flowgraphs: self.flowgraphs.clone(),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<S: Scheduler> Runtime<S> {
pub fn start(&self, fg: Flowgraph) -> Result<RunningFlowgraph, Error> {
runtime::block_on(self.start_async(fg))
}
pub fn run(&self, fg: Flowgraph) -> Result<Flowgraph, Error> {
let running = runtime::block_on(self.start_async(fg))?;
running.wait()
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<S: Scheduler + Sync> Runtime<S> {
pub fn with_scheduler(scheduler: S) -> Self {
Self::with_config(scheduler, Router::new())
}
pub fn with_config(scheduler: S, routes: Router) -> Self {
runtime::init();
let flowgraphs = Arc::new(Mutex::new(Vec::new()));
let handle = RuntimeHandle {
scheduler: scheduler.clone(),
flowgraphs: flowgraphs.clone(),
};
Runtime {
scheduler,
flowgraphs,
_control_port: ControlPort::new(handle, routes),
}
}
}
#[cfg(target_arch = "wasm32")]
impl<S: Scheduler> Runtime<S> {
pub fn with_scheduler(scheduler: S) -> Self {
runtime::init();
let flowgraphs = Arc::new(Mutex::new(Vec::new()));
Runtime {
scheduler,
flowgraphs,
}
}
}
pub struct RuntimeHandle<S = DefaultScheduler> {
scheduler: S,
flowgraphs: Arc<Mutex<Vec<FlowgraphHandle>>>,
}
impl<S: Clone> Clone for RuntimeHandle<S> {
fn clone(&self) -> Self {
Self {
scheduler: self.scheduler.clone(),
flowgraphs: self.flowgraphs.clone(),
}
}
}
impl<S> fmt::Debug for RuntimeHandle<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RuntimeHandle")
.field("flowgraphs", &self.flowgraphs)
.finish()
}
}
impl<S> PartialEq for RuntimeHandle<S> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.flowgraphs, &other.flowgraphs)
}
}
impl<S: Scheduler> RuntimeHandle<S> {
pub async fn start(&self, fg: Flowgraph) -> Result<RunningFlowgraph, Error> {
let running = start_flowgraph(self.scheduler.clone(), fg).await?;
self.add_flowgraph(running.handle()).await;
Ok(running)
}
async fn add_flowgraph(&self, handle: FlowgraphHandle) -> FlowgraphId {
let mut v = self.flowgraphs.lock().await;
let l = v.len();
v.push(handle);
FlowgraphId(l)
}
pub async fn get_flowgraph(&self, id: FlowgraphId) -> Option<FlowgraphHandle> {
self.flowgraphs.lock().await.get(id.0).cloned()
}
pub async fn get_flowgraphs(&self) -> Vec<FlowgraphId> {
self.flowgraphs
.lock()
.await
.iter()
.enumerate()
.map(|x| FlowgraphId(x.0))
.collect()
}
}
async fn start_flowgraph<S: Scheduler>(
scheduler: S,
fg: Flowgraph,
) -> Result<RunningFlowgraph, Error> {
let queue_size = config::config().queue_size;
let (fg_inbox, fg_inbox_rx) = channel::<FlowgraphMessage>(queue_size);
let (tx, rx) = oneshot::channel::<Result<(), Error>>();
let scheduler_clone = scheduler.clone();
let task =
scheduler.spawn(fg.run_flowgraph(scheduler_clone, fg_inbox.clone(), fg_inbox_rx, tx));
rx.await
.map_err(|_| Error::RuntimeError("run_flowgraph panicked".to_string()))??;
let handle = FlowgraphHandle::new(fg_inbox);
Ok(RunningFlowgraph::new(handle, FlowgraphTask::new(task)))
}