use super::{Cmd, ExecRequest};
use crate::renderer::registry::queue_exec_request;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
#[derive(Clone)]
pub struct RenderHandle {
tx: mpsc::UnboundedSender<()>,
}
impl RenderHandle {
pub fn new(tx: mpsc::UnboundedSender<()>) -> Self {
Self { tx }
}
pub fn request(&self) {
let _ = self.tx.send(());
}
}
pub struct CmdExecutor {
runtime: Option<Arc<tokio::runtime::Runtime>>,
render_handle: RenderHandle,
}
impl CmdExecutor {
pub fn new(render_tx: mpsc::UnboundedSender<()>) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) .thread_name("rnk-cmd-executor")
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
Self {
runtime: Some(Arc::new(runtime)),
render_handle: RenderHandle::new(render_tx),
}
}
pub fn execute(&self, cmd: Cmd) {
self.execute_internal(cmd, true);
}
fn execute_internal(&self, cmd: Cmd, notify_render: bool) {
let runtime = self.runtime.as_ref().expect("executor was shutdown");
let render_handle = self.render_handle.clone();
match cmd {
Cmd::None => {
}
Cmd::Batch(cmds) => {
for cmd in cmds {
self.execute_internal(cmd, false);
}
if notify_render {
render_handle.request();
}
}
Cmd::Sequence(cmds) => {
if cmds.is_empty() {
if notify_render {
render_handle.request();
}
return;
}
let runtime_clone = Arc::clone(runtime);
let render_handle_clone = render_handle.clone();
runtime.spawn(async move {
for cmd in cmds {
let temp_executor = CmdExecutor {
runtime: Some(Arc::clone(&runtime_clone)),
render_handle: render_handle_clone.clone(),
};
let (tx, rx) = tokio::sync::oneshot::channel();
temp_executor.execute_with_completion(cmd, tx);
let _ = rx.await;
}
if notify_render {
render_handle_clone.request();
}
});
}
Cmd::Perform { future } => {
runtime.spawn(async move {
future.await;
if notify_render {
render_handle.request();
}
});
}
Cmd::Sleep { duration, then } => {
let runtime_clone = Arc::clone(runtime);
let render_handle_clone = render_handle.clone();
runtime.spawn(async move {
tokio::time::sleep(duration).await;
match *then {
Cmd::None => {
if notify_render {
render_handle_clone.request();
}
}
other => {
let temp_executor = CmdExecutor {
runtime: Some(runtime_clone),
render_handle: render_handle_clone,
};
temp_executor.execute_internal(other, notify_render);
}
}
});
}
Cmd::Tick { duration, msg_fn } => {
runtime.spawn(async move {
tokio::time::sleep(duration).await;
let timestamp = Instant::now();
let _ = msg_fn(timestamp);
if notify_render {
render_handle.request();
}
});
}
Cmd::Every { duration, msg_fn } => {
runtime.spawn(async move {
if duration.is_zero() {
let _ = msg_fn(Instant::now());
if notify_render {
render_handle.request();
}
return;
}
let now = std::time::SystemTime::now();
let since_epoch = now
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let duration_nanos = duration.as_nanos() as u64;
let since_epoch_nanos = since_epoch.as_nanos() as u64;
let remainder = since_epoch_nanos % duration_nanos;
let wait_nanos = if remainder == 0 {
duration_nanos
} else {
duration_nanos - remainder
};
let wait_duration = std::time::Duration::from_nanos(wait_nanos);
tokio::time::sleep(wait_duration).await;
let timestamp = Instant::now();
let _ = msg_fn(timestamp);
if notify_render {
render_handle.request();
}
});
}
Cmd::Exec { config, msg_fn } => {
queue_exec_request(ExecRequest {
config,
callback: Box::new(move |result| {
let _ = msg_fn(result);
}),
});
}
Cmd::ClearScreen
| Cmd::HideCursor
| Cmd::ShowCursor
| Cmd::SetWindowTitle(_)
| Cmd::WindowSize
| Cmd::EnterAltScreen
| Cmd::ExitAltScreen
| Cmd::EnableMouse
| Cmd::DisableMouse
| Cmd::EnableBracketedPaste
| Cmd::DisableBracketedPaste => {
crate::renderer::registry::queue_terminal_cmd(cmd);
if notify_render {
render_handle.request();
}
}
}
}
fn execute_with_completion(&self, cmd: Cmd, completion: tokio::sync::oneshot::Sender<()>) {
let runtime = self.runtime.as_ref().expect("executor was shutdown");
let render_handle = self.render_handle.clone();
match cmd {
Cmd::None => {
let _ = completion.send(());
}
Cmd::Batch(cmds) => {
let runtime_clone = Arc::clone(runtime);
let render_handle_clone = render_handle.clone();
runtime.spawn(async move {
let mut handles = Vec::with_capacity(cmds.len());
for cmd in cmds {
let rt = Arc::clone(&runtime_clone);
let rh = render_handle_clone.clone();
let handle = tokio::spawn(async move {
let temp_executor = CmdExecutor {
runtime: Some(rt),
render_handle: rh,
};
let (tx, rx) = tokio::sync::oneshot::channel();
temp_executor.execute_with_completion(cmd, tx);
let _ = rx.await;
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let _ = completion.send(());
});
}
Cmd::Sequence(cmds) => {
let runtime_clone = Arc::clone(runtime);
let render_handle_clone = render_handle.clone();
runtime.spawn(async move {
for cmd in cmds {
let temp_executor = CmdExecutor {
runtime: Some(Arc::clone(&runtime_clone)),
render_handle: render_handle_clone.clone(),
};
let (tx, rx) = tokio::sync::oneshot::channel();
temp_executor.execute_with_completion(cmd, tx);
let _ = rx.await;
}
let _ = completion.send(());
});
}
Cmd::Perform { future } => {
runtime.spawn(async move {
future.await;
let _ = completion.send(());
});
}
Cmd::Sleep { duration, then } => {
let runtime_clone = Arc::clone(runtime);
let render_handle_clone = render_handle.clone();
runtime.spawn(async move {
tokio::time::sleep(duration).await;
match *then {
Cmd::None => {
let _ = completion.send(());
}
other => {
let temp_executor = CmdExecutor {
runtime: Some(runtime_clone),
render_handle: render_handle_clone,
};
let (tx, rx) = tokio::sync::oneshot::channel();
temp_executor.execute_with_completion(other, tx);
let _ = rx.await;
let _ = completion.send(());
}
}
});
}
Cmd::Tick { duration, msg_fn } => {
runtime.spawn(async move {
tokio::time::sleep(duration).await;
let timestamp = Instant::now();
let _ = msg_fn(timestamp);
let _ = completion.send(());
});
}
Cmd::Every { duration, msg_fn } => {
runtime.spawn(async move {
if duration.is_zero() {
let _ = msg_fn(Instant::now());
let _ = completion.send(());
return;
}
let now = std::time::SystemTime::now();
let since_epoch = now
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let duration_nanos = duration.as_nanos() as u64;
let since_epoch_nanos = since_epoch.as_nanos() as u64;
let remainder = since_epoch_nanos % duration_nanos;
let wait_nanos = if remainder == 0 {
duration_nanos
} else {
duration_nanos - remainder
};
let wait_duration = std::time::Duration::from_nanos(wait_nanos);
tokio::time::sleep(wait_duration).await;
let timestamp = Instant::now();
let _ = msg_fn(timestamp);
let _ = completion.send(());
});
}
Cmd::Exec { config, msg_fn } => {
let result = execute_process_sync(&config);
let _ = msg_fn(result);
let _ = completion.send(());
}
Cmd::ClearScreen
| Cmd::HideCursor
| Cmd::ShowCursor
| Cmd::SetWindowTitle(_)
| Cmd::WindowSize
| Cmd::EnterAltScreen
| Cmd::ExitAltScreen
| Cmd::EnableMouse
| Cmd::DisableMouse
| Cmd::EnableBracketedPaste
| Cmd::DisableBracketedPaste => {
crate::renderer::registry::queue_terminal_cmd(cmd);
let _ = completion.send(());
}
}
}
pub fn render_handle(&self) -> RenderHandle {
self.render_handle.clone()
}
pub fn shutdown(mut self) {
if let Some(runtime) = self.runtime.take() {
if let Ok(runtime) = Arc::try_unwrap(runtime) {
runtime.shutdown_background();
}
}
}
}
impl Drop for CmdExecutor {
fn drop(&mut self) {
if let Some(runtime) = self.runtime.take() {
if let Ok(runtime) = Arc::try_unwrap(runtime) {
runtime.shutdown_background();
}
}
}
}
fn execute_process_sync(config: &super::ExecConfig) -> super::ExecResult {
use std::process::{Command, Stdio};
let mut cmd = Command::new(&config.command);
cmd.args(&config.args);
for (key, value) in &config.env {
cmd.env(key, value);
}
if let Some(ref dir) = config.current_dir {
cmd.current_dir(dir);
}
cmd.stdin(Stdio::inherit());
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::inherit());
match cmd.status() {
Ok(status) => {
if let Some(code) = status.code() {
super::ExecResult::success(code)
} else {
super::ExecResult::terminated_by_signal()
}
}
Err(e) => super::ExecResult::error(e.to_string()),
}
}
pub fn run_exec_process(config: &super::ExecConfig) -> super::ExecResult {
execute_process_sync(config)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::time::Duration;
#[tokio::test]
async fn test_executor_creation() {
let (tx, _rx) = mpsc::unbounded_channel();
let _executor = CmdExecutor::new(tx);
}
#[tokio::test]
async fn test_execute_none() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
executor.execute(Cmd::None);
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_execute_perform() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
executor.execute(Cmd::perform(move || async move {
flag_clone.store(true, Ordering::SeqCst);
}));
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert!(flag.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_execute_sleep() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let start = std::time::Instant::now();
executor.execute(Cmd::sleep(Duration::from_millis(100)));
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100));
assert!(elapsed < Duration::from_millis(500)); }
#[tokio::test]
async fn test_execute_sleep_and_then() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let cmd =
Cmd::sleep(Duration::from_millis(100)).and_then(Cmd::perform(move || async move {
flag_clone.store(true, Ordering::SeqCst);
}));
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert!(flag.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_execute_batch() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let counter = Arc::new(AtomicU32::new(0));
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
let c3 = Arc::clone(&counter);
let cmd = Cmd::batch(vec![
Cmd::perform(move || async move {
c1.fetch_add(1, Ordering::SeqCst);
}),
Cmd::perform(move || async move {
c2.fetch_add(1, Ordering::SeqCst);
}),
Cmd::perform(move || async move {
c3.fetch_add(1, Ordering::SeqCst);
}),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_batch_with_sleep() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag1 = Arc::new(AtomicBool::new(false));
let flag2 = Arc::new(AtomicBool::new(false));
let f1 = Arc::clone(&flag1);
let f2 = Arc::clone(&flag2);
let cmd = Cmd::batch(vec![
Cmd::sleep(Duration::from_millis(50)).and_then(Cmd::perform(move || async move {
f1.store(true, Ordering::SeqCst);
})),
Cmd::perform(move || async move {
f2.store(true, Ordering::SeqCst);
}),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(flag1.load(Ordering::SeqCst));
assert!(flag2.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_render_handle() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let handle = executor.render_handle();
handle.request();
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
}
#[tokio::test]
async fn test_render_handle_clone() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let handle1 = executor.render_handle();
let handle2 = handle1.clone();
handle1.request();
handle2.request();
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
}
#[tokio::test]
async fn test_nested_sleep_chain() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag = Arc::new(AtomicBool::new(false));
let f = Arc::clone(&flag);
let cmd = Cmd::sleep(Duration::from_millis(50))
.and_then(Cmd::sleep(Duration::from_millis(50)))
.and_then(Cmd::perform(move || async move {
f.store(true, Ordering::SeqCst);
}));
let start = std::time::Instant::now();
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100));
assert!(flag.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_executor_shutdown() {
let (tx, _rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
executor.shutdown();
}
#[tokio::test]
async fn test_concurrent_executions() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let counter = Arc::new(AtomicU32::new(0));
for _ in 0..10 {
let c = Arc::clone(&counter);
executor.execute(Cmd::perform(move || async move {
c.fetch_add(1, Ordering::SeqCst);
}));
}
for _ in 0..10 {
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
}
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[tokio::test]
async fn test_empty_batch() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
executor.execute(Cmd::batch(vec![]));
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_execute_sequence() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let o1 = Arc::clone(&order);
let o2 = Arc::clone(&order);
let o3 = Arc::clone(&order);
let cmd = Cmd::sequence(vec![
Cmd::perform(move || async move {
o1.lock().unwrap().push(1);
}),
Cmd::perform(move || async move {
o2.lock().unwrap().push(2);
}),
Cmd::perform(move || async move {
o3.lock().unwrap().push(3);
}),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(100)).await;
let result = order.lock().unwrap().clone();
assert_eq!(result, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_sequence_with_sleep() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let o1 = Arc::clone(&order);
let o2 = Arc::clone(&order);
let cmd = Cmd::sequence(vec![
Cmd::sleep(Duration::from_millis(50)).and_then(Cmd::perform(move || async move {
o1.lock().unwrap().push(1);
})),
Cmd::perform(move || async move {
o2.lock().unwrap().push(2);
}),
]);
let start = std::time::Instant::now();
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(100)).await;
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(50));
let result = order.lock().unwrap().clone();
assert_eq!(result, vec![1, 2]);
}
#[tokio::test]
async fn test_sequence_timing() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let cmd = Cmd::sequence(vec![
Cmd::sleep(Duration::from_millis(50)),
Cmd::sleep(Duration::from_millis(50)),
Cmd::sleep(Duration::from_millis(50)),
]);
let start = std::time::Instant::now();
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(150));
assert!(elapsed < Duration::from_millis(500));
}
#[tokio::test]
async fn test_empty_sequence() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
executor.execute(Cmd::sequence(vec![]));
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_sequence_vs_batch_timing() {
let (tx1, mut rx1) = mpsc::unbounded_channel();
let executor1 = CmdExecutor::new(tx1);
let (tx2, mut rx2) = mpsc::unbounded_channel();
let executor2 = CmdExecutor::new(tx2);
let batch_cmd = Cmd::batch(vec![
Cmd::sleep(Duration::from_millis(50)),
Cmd::sleep(Duration::from_millis(50)),
]);
let seq_cmd = Cmd::sequence(vec![
Cmd::sleep(Duration::from_millis(50)),
Cmd::sleep(Duration::from_millis(50)),
]);
let batch_start = std::time::Instant::now();
executor1.execute(batch_cmd);
tokio::time::timeout(Duration::from_secs(2), rx1.recv())
.await
.expect("timeout")
.expect("channel closed");
let batch_elapsed = batch_start.elapsed();
let seq_start = std::time::Instant::now();
executor2.execute(seq_cmd);
tokio::time::timeout(Duration::from_secs(2), rx2.recv())
.await
.expect("timeout")
.expect("channel closed");
let seq_elapsed = seq_start.elapsed();
assert!(seq_elapsed >= Duration::from_millis(100));
assert!(batch_elapsed < Duration::from_millis(100));
}
#[tokio::test]
async fn test_nested_sequence_in_batch() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let counter = Arc::new(AtomicU32::new(0));
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
let c3 = Arc::clone(&counter);
let cmd = Cmd::batch(vec![
Cmd::sequence(vec![Cmd::perform(move || async move {
c1.fetch_add(1, Ordering::SeqCst);
})]),
Cmd::sequence(vec![
Cmd::perform(move || async move {
c2.fetch_add(1, Ordering::SeqCst);
}),
Cmd::perform(move || async move {
c3.fetch_add(1, Ordering::SeqCst);
}),
]),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_execute_tick() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let timestamp_received = Arc::new(std::sync::Mutex::new(None));
let ts_clone = Arc::clone(×tamp_received);
let cmd = Cmd::tick(Duration::from_millis(50), move |ts| {
flag_clone.store(true, Ordering::SeqCst);
*ts_clone.lock().unwrap() = Some(ts);
});
let start = std::time::Instant::now();
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(50));
assert!(flag.load(Ordering::SeqCst));
assert!(timestamp_received.lock().unwrap().is_some());
}
#[tokio::test]
async fn test_tick_timing() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let cmd = Cmd::tick(Duration::from_millis(100), |_| {});
let start = std::time::Instant::now();
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100));
assert!(elapsed < Duration::from_millis(200));
}
#[tokio::test]
async fn test_multiple_ticks() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let counter = Arc::new(AtomicU32::new(0));
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
let cmd = Cmd::batch(vec![
Cmd::tick(Duration::from_millis(50), move |_| {
c1.fetch_add(1, Ordering::SeqCst);
}),
Cmd::tick(Duration::from_millis(100), move |_| {
c2.fetch_add(1, Ordering::SeqCst);
}),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_execute_every() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let cmd = Cmd::every(Duration::from_millis(100), move |_| {
flag_clone.store(true, Ordering::SeqCst);
});
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert!(flag.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_every_receives_timestamp() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let timestamp_received = Arc::new(std::sync::Mutex::new(None));
let ts_clone = Arc::clone(×tamp_received);
let cmd = Cmd::every(Duration::from_millis(50), move |ts| {
*ts_clone.lock().unwrap() = Some(ts);
});
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert!(timestamp_received.lock().unwrap().is_some());
}
#[tokio::test]
async fn test_every_zero_duration_is_safe_and_immediate() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let start = std::time::Instant::now();
executor.execute(Cmd::every(Duration::ZERO, move |_| {
flag_clone.store(true, Ordering::SeqCst);
}));
tokio::time::timeout(Duration::from_millis(200), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert!(flag.load(Ordering::SeqCst));
assert!(start.elapsed() < Duration::from_millis(100));
}
#[tokio::test]
async fn test_sequence_with_tick() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let o1 = Arc::clone(&order);
let o2 = Arc::clone(&order);
let cmd = Cmd::sequence(vec![
Cmd::tick(Duration::from_millis(50), move |_| {
o1.lock().unwrap().push(1);
}),
Cmd::perform(move || async move {
o2.lock().unwrap().push(2);
}),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(100)).await;
let result = order.lock().unwrap().clone();
assert_eq!(result, vec![1, 2]);
}
#[tokio::test]
async fn test_batch_with_sequence_and_tick() {
let (tx, mut rx) = mpsc::unbounded_channel();
let executor = CmdExecutor::new(tx);
let counter = Arc::new(AtomicU32::new(0));
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
let c3 = Arc::clone(&counter);
let cmd = Cmd::batch(vec![
Cmd::sequence(vec![
Cmd::tick(Duration::from_millis(25), move |_| {
c1.fetch_add(1, Ordering::SeqCst);
}),
Cmd::perform(move || async move {
c2.fetch_add(1, Ordering::SeqCst);
}),
]),
Cmd::tick(Duration::from_millis(50), move |_| {
c3.fetch_add(1, Ordering::SeqCst);
}),
]);
executor.execute(cmd);
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
}