use std::{
pin::Pin,
sync::{atomic::AtomicU64, Mutex},
task::{Context, Poll},
};
use futures::{Future, Sink, SinkExt, Stream, StreamExt};
use pin_project_lite::pin_project;
use rmpv::Value;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{
mpsc::{channel, Receiver, Sender},
oneshot,
},
};
use tokio_util::codec::{FramedRead, FramedWrite};
use crate::{msgpack::MessagePackCodec, Message};
pub struct Nvim<R, W> {
reader: FramedRead<R, MessagePackCodec>,
writer: FramedWrite<W, MessagePackCodec>,
current_id: AtomicU64,
}
impl<R, W> Nvim<R, W>
where
R: AsyncRead + Sync + Send + Unpin + 'static,
W: AsyncWrite + Sync + Send + Unpin + 'static,
{
pub fn new(reader: R, writer: W) -> Self {
Self {
reader: FramedRead::new(reader, MessagePackCodec::new()),
writer: FramedWrite::new(writer, MessagePackCodec::new()),
current_id: AtomicU64::new(0),
}
}
pub async fn request(&mut self, method: impl Into<String>, params: Vec<Value>) -> Message {
let next_id = self
.current_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.writer
.send(Message::Request {
id: next_id,
method: method.into(),
params: Value::from(params),
})
.await
.unwrap();
self.wait(next_id).await
}
pub async fn notify(&mut self, method: impl Into<String>, params: Vec<Value>) -> () {
let next_id = self
.current_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let _ = self
.writer
.send(Message::Request {
id: next_id,
method: method.into(),
params: Value::from(params),
})
.await;
}
async fn wait(&mut self, wait_id: u64) -> Message {
loop {
let message = self.reader.next().await.unwrap().unwrap();
match message {
Message::Response { id, error, result } => {
if wait_id == id {
return Message::Response { id, error, result };
}
}
Message::Request { id, method, params } => {
}
Message::Notification { .. } => {
}
}
}
}
}