1use std::{
2 pin::Pin,
3 sync::{atomic::AtomicU64, Mutex},
4 task::{Context, Poll},
5};
6
7use futures::{Future, Sink, SinkExt, Stream, StreamExt};
8use pin_project_lite::pin_project;
9use rmpv::Value;
10use tokio::{
11 io::{AsyncRead, AsyncWrite},
12 sync::{
13 mpsc::{channel, Receiver, Sender},
14 oneshot,
15 },
16};
17use tokio_util::codec::{FramedRead, FramedWrite};
18
19use crate::{msgpack::MessagePackCodec, Message};
20
21pub struct Nvim<R, W> {
22 reader: FramedRead<R, MessagePackCodec>,
23 writer: FramedWrite<W, MessagePackCodec>,
24 current_id: AtomicU64,
25}
26
27impl<R, W> Nvim<R, W>
28where
29 R: AsyncRead + Sync + Send + Unpin + 'static,
30 W: AsyncWrite + Sync + Send + Unpin + 'static,
31{
32 pub fn new(reader: R, writer: W) -> Self {
33 Self {
34 reader: FramedRead::new(reader, MessagePackCodec::new()),
35 writer: FramedWrite::new(writer, MessagePackCodec::new()),
36 current_id: AtomicU64::new(0),
37 }
38 }
39
40 pub async fn request(&mut self, method: impl Into<String>, params: Vec<Value>) -> Message {
41 let next_id = self
42 .current_id
43 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
44
45 self.writer
46 .send(Message::Request {
47 id: next_id,
48 method: method.into(),
49 params: Value::from(params),
50 })
51 .await
52 .unwrap();
53
54 self.wait(next_id).await
55 }
56
57 pub async fn notify(&mut self, method: impl Into<String>, params: Vec<Value>) -> () {
58 let next_id = self
59 .current_id
60 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
61
62 let _ = self
63 .writer
64 .send(Message::Request {
65 id: next_id,
66 method: method.into(),
67 params: Value::from(params),
68 })
69 .await;
70 }
71
72 async fn wait(&mut self, wait_id: u64) -> Message {
73 loop {
74 let message = self.reader.next().await.unwrap().unwrap();
75
76 match message {
77 Message::Response { id, error, result } => {
78 if wait_id == id {
79 return Message::Response { id, error, result };
80 }
81 }
82 Message::Request { id, method, params } => {
83 }
85 Message::Notification { .. } => {
86 }
88 }
89 }
90 }
91}