use crate::agent::Agent;
use crate::exec::event_action::ExecActionEvent;
use crate::exec::exec_agent_run::exec_run_agent;
use crate::exec::exec_cmd_xelf::exec_xelf_update;
use crate::exec::support::open_vscode;
use crate::exec::{
ExecStatusEvent,
RunRedoCtx,
exec_check_keys,
exec_install,
exec_list,
exec_new,
exec_pack,
exec_run,
exec_run_redo,
exec_xelf_setup, };
use crate::hub::get_hub;
use crate::init::{init_base, init_base_and_dir_context, init_wks};
use crate::runtime::Runtime;
use crate::{Error, Result};
use derive_more::derive::From;
use flume::{Receiver, Sender};
use simple_fs::SPath;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Mutex;
pub struct Executor {
action_rx: Receiver<ExecActionEvent>,
action_sender: ExecutorSender,
current_redo_ctx: Arc<Mutex<Option<RedoCtx>>>,
active_actions: Arc<AtomicUsize>,
}
impl Executor {
pub fn new() -> Self {
let (tx, rx) = flume::unbounded();
Executor {
action_rx: rx,
action_sender: ExecutorSender::new(tx),
current_redo_ctx: Default::default(),
active_actions: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Executor {
pub fn sender(&self) -> ExecutorSender {
self.action_sender.clone()
}
async fn get_agent_file_path(&self) -> Option<SPath> {
let redo_ctx = self.current_redo_ctx.lock().await;
let path = redo_ctx
.as_ref()
.and_then(|r| r.get_agent())
.map(|a| a.file_path())
.map(SPath::new);
path
}
async fn set_current_redo_ctx(&self, redo_ctx: RedoCtx) {
let mut guard = self.current_redo_ctx.lock().await;
*guard = Some(redo_ctx);
}
async fn take_current_redo_ctx(&self) -> Option<RedoCtx> {
let mut guard = self.current_redo_ctx.lock().await;
guard.take()
}
fn increment_actions(&self) -> bool {
let prev_count = self.active_actions.fetch_add(1, Ordering::SeqCst);
prev_count == 0
}
fn decrement_actions(&self) -> bool {
let prev_count = self.active_actions.fetch_sub(1, Ordering::SeqCst);
prev_count == 1
}
}
impl Executor {
pub async fn start(self) -> Result<()> {
let executor = Arc::new(self);
loop {
let Ok(action) = executor.action_rx.recv_async().await else {
println!("!!!! Aipack Executor: Channel closed");
break;
};
let xt = executor.clone();
let action_str = action.as_str();
tokio::spawn(async move {
if let Err(err) = xt.perform_action(action).await {
get_hub()
.publish(format!("Fail to perform action '{action_str}'. Cause: {err}"))
.await;
}
});
}
Ok(())
}
async fn perform_action(&self, action: ExecActionEvent) -> Result<()> {
let hub = get_hub();
let is_first_action = self.increment_actions();
if is_first_action {
hub.publish(ExecStatusEvent::StartExec).await;
}
match action {
ExecActionEvent::CmdInit(init_args) => {
init_wks(init_args.path.as_deref(), true).await?;
init_base(false).await?;
}
ExecActionEvent::CmdInitBase => {
init_base(true).await?;
}
ExecActionEvent::CmdNewAgent(new_args) => {
exec_new(new_args, init_wks(None, false).await?).await?;
}
ExecActionEvent::CmdList(list_args) => {
exec_list(init_base_and_dir_context(false).await?, list_args).await?
}
ExecActionEvent::CmdPack(pack_args) => exec_pack(&pack_args).await?,
ExecActionEvent::CmdInstall(install_args) => {
exec_install(init_base_and_dir_context(false).await?, install_args).await?
}
ExecActionEvent::CmdRun(run_args) => {
hub.publish(ExecStatusEvent::RunStart).await;
init_base(false).await?;
let dir_ctx = init_wks(None, false).await?;
let exec_sender = self.sender();
let runtime = Runtime::new(dir_ctx, exec_sender)?;
let redo = exec_run(run_args, runtime).await?;
self.set_current_redo_ctx(redo.into()).await;
hub.publish(ExecStatusEvent::RunEnd).await;
}
ExecActionEvent::CmdCheckKeys(args) => {
exec_check_keys(args).await?;
}
ExecActionEvent::CmdXelfSetup(args) => {
exec_xelf_setup(args).await?;
}
ExecActionEvent::CmdXelfUpdate(args) => {
exec_xelf_update(args).await?;
}
ExecActionEvent::Redo => {
if let Some(redo_ctx) = self.take_current_redo_ctx().await {
hub.publish(ExecStatusEvent::RunStart).await;
match redo_ctx {
RedoCtx::RunRedoCtx(redo_ctx_orig) => {
if let Some(redo_ctx) = exec_run_redo(&redo_ctx_orig).await {
self.set_current_redo_ctx(redo_ctx.into()).await;
}
else {
self.set_current_redo_ctx(redo_ctx_orig.into()).await;
}
}
}
} else {
hub.publish(Error::custom("No redo available to be performed")).await;
}
hub.publish(ExecStatusEvent::RunEnd).await;
}
ExecActionEvent::OpenAgent => {
if let Some(agent_file_path) = self.get_agent_file_path().await {
open_vscode(agent_file_path).await
}
}
ExecActionEvent::RunAgent(run_agent_params) => {
if let Err(err) = exec_run_agent(run_agent_params).await {
hub.publish(Error::cc("Fail to run agent", err)).await;
}
}
}
let is_last_action = self.decrement_actions();
if is_last_action {
hub.publish(ExecStatusEvent::EndExec).await;
}
Ok(())
}
}
#[derive(From)]
enum RedoCtx {
RunRedoCtx(Arc<RunRedoCtx>),
}
impl From<RunRedoCtx> for RedoCtx {
fn from(run_redo_ctx: RunRedoCtx) -> Self {
RedoCtx::RunRedoCtx(run_redo_ctx.into())
}
}
impl RedoCtx {
pub fn get_agent(&self) -> Option<&Agent> {
match self {
RedoCtx::RunRedoCtx(redo_ctx) => Some(redo_ctx.agent()),
}
}
}
#[derive(Debug, Clone)]
pub struct ExecutorSender {
tx: Sender<ExecActionEvent>,
}
impl ExecutorSender {
fn new(tx: Sender<ExecActionEvent>) -> Self {
ExecutorSender { tx }
}
pub async fn send(&self, event: ExecActionEvent) {
let event_str: &'static str = (&event).into();
if let Err(err) = self.tx.send_async(event).await {
get_hub()
.publish(Error::cc(format!("Fail to send action event {}", event_str), err))
.await;
};
}
}