use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use tokio::runtime::Runtime;
use crate::{
ActflowError, ChannelEvent, ChannelOptions, Result,
common::{MemCache, Queue, Shutdown},
model::WorkflowModel,
runtime::{Channel, Process, ProcessId},
};
const PROCESS_CACHE_SIZE: usize = 2048;
const PROCESS_COMPLETE_QUEUE_SIZE: usize = 100;
pub struct Engine {
channel: Arc<Channel>,
procs_complete_queue: Arc<Queue<ProcessId>>,
procs: Arc<MemCache<ProcessId, Arc<Process>>>,
running: Arc<AtomicBool>,
runtime: Arc<Runtime>,
shutdown: Arc<Shutdown>,
}
impl Engine {
pub fn new(runtime: Arc<Runtime>) -> Self {
let channel = Arc::new(Channel::new(runtime.clone()));
let procs_complete_queue = Queue::new(PROCESS_COMPLETE_QUEUE_SIZE);
Self {
channel,
procs_complete_queue,
procs: Arc::new(MemCache::new(PROCESS_CACHE_SIZE)),
running: Arc::new(AtomicBool::new(false)),
runtime,
shutdown: Arc::new(Shutdown::new()),
}
}
pub fn launch(&self) {
if self.running.swap(true, Ordering::Relaxed) {
return;
}
self.channel.listen();
let procs_complete_queue = self.procs_complete_queue.clone();
ChannelEvent::channel(self.channel.clone(), ChannelOptions::default()).on_complete(move |pid| {
let _ = procs_complete_queue.send(pid);
});
let procs_complete_queue = self.procs_complete_queue.clone();
let shutdown = self.shutdown.clone();
let procs = self.procs.clone();
self.runtime.spawn(async move {
loop {
tokio::select! {
_ = shutdown.wait() => break,
Some(pid) = procs_complete_queue.next_async() => {
procs.remove(&pid);
}
}
}
});
}
pub fn shutdown(&self) {
if self.running.swap(false, Ordering::Relaxed) {
return;
}
self.shutdown.shutdown();
for (_, proc) in self.procs.iter() {
proc.abort();
}
self.channel.shutdown();
}
pub fn build_workflow_process(
&self,
workflow: &WorkflowModel,
) -> Result<Arc<Process>> {
if !self.running.load(Ordering::Relaxed) {
return Err(ActflowError::Engine("Engine is not running".to_string()));
}
let process = Process::new(workflow, self.channel.clone(), self.runtime.clone())?;
let process_id = process.id().to_string();
if self.procs.get(&process_id).is_some() {
return Err(ActflowError::Process(format!("Process {} already exists in cache", process_id)));
}
self.procs.set(process_id.clone(), process.clone());
Ok(process)
}
pub fn stop(
&self,
process_id: &str,
) -> Result<()> {
let process_id_string = process_id.to_string();
if let Some(process) = self.procs.get(&process_id_string) {
process.abort();
Ok(())
} else {
Err(ActflowError::Process(format!("Process {} not found", process_id)))
}
}
pub fn get_process(
&self,
process_id: &String,
) -> Option<Arc<Process>> {
self.procs.get(process_id)
}
pub fn channel(&self) -> Arc<Channel> {
self.channel.clone()
}
}