use futures::Stream;
use rmpv::Value;
use std::{
pin::Pin,
sync::{atomic::AtomicU64, Arc},
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{
mpsc::{channel, Receiver, Sender},
oneshot, Mutex,
},
};
use tokio_util::{
codec::{FramedRead, FramedWrite},
sync::CancellationToken,
};
use crate::msgpack::{Message, MessagePackCodec};
#[derive(Debug)]
pub enum Event {
Request(String, Vec<Value>, oneshot::Sender<Result<Value, Value>>),
Notification(String, Vec<Value>),
}
#[derive(Debug)]
pub enum Error {
NeovimError,
SendError,
InternalError,
}
pub struct Shared {
current_id: AtomicU64,
pending: Mutex<Vec<(u64, oneshot::Sender<Message>)>>,
pub(crate) cancellation_token: CancellationToken,
}
impl Shared {
pub fn new() -> Self {
Self {
current_id: AtomicU64::new(0),
pending: Mutex::new(Vec::new()),
cancellation_token: CancellationToken::new(),
}
}
pub fn fetch_next_id(&self) -> u64 {
use std::sync::atomic::Ordering;
self.current_id.fetch_add(1, Ordering::SeqCst)
}
pub async fn append(&self, id: u64, tx: oneshot::Sender<Message>) {
let mut guard = self.pending.lock().await;
guard.push((id, tx));
}
pub async fn notify(&self, message: Message) {
match message {
Message::Response { id, error, result } => {
let mut guard = self.pending.lock().await;
if let Some(p) = guard.iter().position(|m| m.0 as u64 == id) {
let _ = guard
.remove(p)
.1
.send(Message::Response { id, error, result });
}
}
_ => {}
}
}
}
pub struct Neovim {
state: Arc<Shared>,
incoming_rx: Receiver<Message>,
outgoing_tx: Sender<Message>,
}
impl Neovim {
pub fn new<R, W>(reader: R, writer: W) -> Self
where
R: AsyncRead + Sync + Send + Unpin + 'static,
W: AsyncWrite + Sync + Send + Unpin + 'static,
{
let (incoming_tx, incoming_rx) = channel::<Message>(100);
let (outgoing_tx, mut outgoing_rx) = channel::<Message>(100);
let state = Arc::new(Shared::new());
tokio::spawn({
use futures::StreamExt;
let state = state.clone();
let mut reader = FramedRead::new(reader, MessagePackCodec::new());
async move {
while let Some(res) = reader.next().await {
match res {
Ok(message) => {
handle_incoming_message(incoming_tx.clone(), state.clone(), message)
.await;
}
Err(err) => {
dbg!("teste err", err);
}
};
}
}
});
tokio::spawn({
use futures::SinkExt;
let mut writer = FramedWrite::new(writer, MessagePackCodec::new());
let state = state.clone();
async move {
loop {
tokio::select! {
Some(message) = outgoing_rx.recv() => {
let _ = writer.send(message).await;
},
_ = state.cancellation_token.cancelled() => {
eprintln!("Closing writer channel...");
outgoing_rx.close();
eprintln!("Draining writer channel...");
while let Some(message) = outgoing_rx.recv().await {
let _ = writer.send(message).await;
}
eprintln!("Closing writer...");
let _ = writer.close().await;
return;
}
}
}
}
});
Self {
state,
incoming_rx,
outgoing_tx,
}
}
pub async fn request(
&self,
method: impl Into<String>,
params: Vec<Value>,
) -> Result<Message, Error> {
let next_id = self.state.fetch_next_id();
self.send(Message::Request {
id: next_id,
method: method.into(),
params: Value::from(params),
})
.await?;
let (tx, rx) = oneshot::channel::<Message>();
self.state.append(next_id, tx).await;
rx.await.map_err(|_| Error::InternalError)
}
pub async fn notify(&self, method: impl Into<String>, params: Vec<Value>) -> Result<(), Error> {
self.send(Message::Request {
id: self.state.fetch_next_id(),
method: method.into(),
params: Value::from(params),
})
.await
}
pub async fn try_quit(&self) {}
pub async fn force_quit(&self) {}
pub(crate) async fn send(&self, message: Message) -> Result<(), Error> {
self.outgoing_tx
.clone()
.send(message)
.await
.map_err(|_| Error::InternalError)
}
}
impl Stream for Neovim {
type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.incoming_rx.poll_recv(cx)
}
}
async fn handle_incoming_message(tx: Sender<Message>, state: Arc<Shared>, message: Message) {
match message {
Message::Notification { method, params } => {
}
Message::Request { id, method, params } => {
}
Message::Response { .. } => {
state.notify(message).await;
}
}
}