use std::{
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use tokio::{
sync::{
mpsc::{UnboundedSender, unbounded_channel},
Notify,
},
process::{ChildStdin, Command},
task::yield_now,
};
use futures::FutureExt;
use nvim_rs::{
self,
create::tokio as create,
compat::tokio::Compat,
neovim::Neovim,
Value,
};
mod common;
use common::*;
const ITERS: usize = 25;
const TIMEOUT: Duration = Duration::from_secs(60);
macro_rules! timeout {
($x:expr) => {
tokio::time::timeout(TIMEOUT, $x).map(|res| {
res.expect(&format!("Timed out waiting for {}", stringify!($x)))
})
}
}
#[derive(Clone)]
struct Handler {
result_sender: UnboundedSender<u8>,
notifiers: Arc<Vec<Notify>>,
}
#[async_trait]
impl nvim_rs::Handler for Handler {
type Writer = Compat<ChildStdin>;
async fn handle_notify(
&self,
name: String,
args: Vec<Value>,
_: Neovim<Self::Writer>,
) {
assert_eq!(name, "idx");
let idx = args[0].as_i64().unwrap();
self.notifiers[idx as usize].notified().await;
self.result_sender.send(idx as u8).unwrap();
}
}
#[tokio::test]
async fn sequential_notifications() {
let mut notifiers = Vec::<Notify>::with_capacity(ITERS);
for _ in 0..ITERS {
notifiers.push(Notify::new());
}
let notifiers = Arc::new(notifiers);
let (result_sender, mut result_receiver) = unbounded_channel();
let handler = Handler {
result_sender,
notifiers: notifiers.clone(),
};
let (nvim, _io_handler, _child) = create::new_child_cmd(
Command::new(nvim_path()).args(&[
"-u",
"NONE",
"--embed",
"--headless",
]),
handler
)
.await
.unwrap();
let mut nvim_cmds = Vec::<String>::with_capacity(ITERS);
for i in 0..ITERS {
nvim_cmds.push(format!("call rpcnotify(1, 'idx', {})", i));
}
timeout!(nvim.command(nvim_cmds.join("|").as_str())).await.unwrap();
for notifier in notifiers.iter().rev() {
notifier.notify_one();
yield_now().await;
}
for i in 0..ITERS {
assert_eq!(i as u8, timeout!(result_receiver.recv()).await.unwrap());
}
}