nui 0.0.1

Experimental neovim RPC for UI clients
Documentation
use futures::Stream;
use rmpv::Value;
use std::{
    pin::Pin,
    sync::{atomic::AtomicU64, Arc},
    task::{Context, Poll},
};
use tokio::{
    io::{AsyncRead, AsyncWrite},
    sync::{
        mpsc::{channel, Receiver, Sender},
        oneshot, Mutex,
    },
};
use tokio_util::{
    codec::{FramedRead, FramedWrite},
    sync::CancellationToken,
};

use crate::msgpack::{Message, MessagePackCodec};

#[derive(Debug)]
pub enum Event {
    Request(String, Vec<Value>, oneshot::Sender<Result<Value, Value>>),
    Notification(String, Vec<Value>),
}

#[derive(Debug)]
pub enum Error {
    NeovimError,
    SendError,
    InternalError,
}

// struct Inner {
//     atomic_id: AtomicU64,
//     queue: Mutex<Vec<(u64, oneshot::Sender<Message>)>>,
// }
//
// impl Inner {
//     pub async fn push(&self) -> (u64, oneshot::Receiver<Message>) {
//         let next_id = self.next_id();
//         let (tx, rx) = oneshot::channel::<Message>();
//
//         {
//             let mut guard = self.queue.lock().await;
//             guard.push((next_id, tx));
//         }
//
//         (next_id, rx)
//     }
//
//     pub async fn find_sender(&self, message: Message) {
//         match message {
//             Message::Response { id, error, result } => {
//                 let mut guard = self.queue.lock().await;
//
//                 if let Some(p) = guard.iter().position(|m| m.0 as u64 == id) {
//                     guard
//                         .remove(p)
//                         .1
//                         .send(Message::Response { id, error, result })
//                         .unwrap()
//                 }
//             }
//             _ => {}
//         }
//     }
//
//     pub fn next_id(&self) -> u64 {
//         use std::sync::atomic::Ordering;
//
//         self.atomic_id.fetch_add(1, Ordering::SeqCst)
//     }
// }

pub struct Shared {
    current_id: AtomicU64,
    pending: Mutex<Vec<(u64, oneshot::Sender<Message>)>>,
    pub(crate) cancellation_token: CancellationToken,
}

impl Shared {
    pub fn new() -> Self {
        Self {
            current_id: AtomicU64::new(0),
            pending: Mutex::new(Vec::new()),
            cancellation_token: CancellationToken::new(),
        }
    }

    pub fn fetch_next_id(&self) -> u64 {
        use std::sync::atomic::Ordering;

        self.current_id.fetch_add(1, Ordering::SeqCst)
    }

    pub async fn append(&self, id: u64, tx: oneshot::Sender<Message>) {
        let mut guard = self.pending.lock().await;
        guard.push((id, tx));
    }

    pub async fn notify(&self, message: Message) {
        match message {
            Message::Response { id, error, result } => {
                let mut guard = self.pending.lock().await;
                if let Some(p) = guard.iter().position(|m| m.0 as u64 == id) {
                    let _ = guard
                        .remove(p)
                        .1
                        .send(Message::Response { id, error, result });
                }
            }
            _ => {}
        }
    }
}

pub struct Neovim {
    state: Arc<Shared>,
    incoming_rx: Receiver<Message>,
    outgoing_tx: Sender<Message>,
}

impl Neovim {
    pub fn new<R, W>(reader: R, writer: W) -> Self
    where
        R: AsyncRead + Sync + Send + Unpin + 'static,
        W: AsyncWrite + Sync + Send + Unpin + 'static,
    {
        let (incoming_tx, incoming_rx) = channel::<Message>(100);
        let (outgoing_tx, mut outgoing_rx) = channel::<Message>(100);
        let state = Arc::new(Shared::new());

        tokio::spawn({
            use futures::StreamExt;

            let state = state.clone();
            let mut reader = FramedRead::new(reader, MessagePackCodec::new());
            // let outgoing_tx = outgoing_tx.clone();

            async move {
                while let Some(res) = reader.next().await {
                    match res {
                        Ok(message) => {
                            handle_incoming_message(incoming_tx.clone(), state.clone(), message)
                                .await;
                        }
                        Err(err) => {
                            dbg!("teste err", err);
                        }
                    };
                }

                // `reader` reached EOF, dispatch cancel token
                // state.cancellation_token.cancel();
            }
        });

        tokio::spawn({
            use futures::SinkExt;

            let mut writer = FramedWrite::new(writer, MessagePackCodec::new());
            let state = state.clone();
            // let inner = inner.clone();

            async move {
                loop {
                    tokio::select! {
                        Some(message) = outgoing_rx.recv() => {
                            let _ = writer.send(message).await;
                        },
                        _ = state.cancellation_token.cancelled() => {
                            eprintln!("Closing writer channel...");
                            outgoing_rx.close();

                            eprintln!("Draining writer channel...");
                            while let Some(message) = outgoing_rx.recv().await {
                                let _ = writer.send(message).await;
                            }

                            eprintln!("Closing writer...");
                            let _ = writer.close().await;

                            return;
                        }
                    }
                }
            }
        });

        Self {
            state,
            incoming_rx,
            outgoing_tx,
        }
    }

    pub async fn request(
        &self,
        method: impl Into<String>,
        params: Vec<Value>,
    ) -> Result<Message, Error> {
        let next_id = self.state.fetch_next_id();

        self.send(Message::Request {
            id: next_id,
            method: method.into(),
            params: Value::from(params),
        })
        .await?;

        let (tx, rx) = oneshot::channel::<Message>();

        self.state.append(next_id, tx).await;

        rx.await.map_err(|_| Error::InternalError)
    }

    pub async fn notify(&self, method: impl Into<String>, params: Vec<Value>) -> Result<(), Error> {
        self.send(Message::Request {
            id: self.state.fetch_next_id(),
            method: method.into(),
            params: Value::from(params),
        })
        .await
    }

    pub async fn try_quit(&self) {}

    pub async fn force_quit(&self) {}

    pub(crate) async fn send(&self, message: Message) -> Result<(), Error> {
        self.outgoing_tx
            .clone()
            .send(message)
            .await
            .map_err(|_| Error::InternalError)
    }
}

impl Stream for Neovim {
    type Item = Message;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.incoming_rx.poll_recv(cx)
    }
}

async fn handle_incoming_message(tx: Sender<Message>, state: Arc<Shared>, message: Message) {
    match message {
        Message::Notification { method, params } => {
            // tx.send(Event::Notification(method, params.try_into().unwrap()))
            //     .await
            //     .unwrap();
        }
        Message::Request { id, method, params } => {
            // let (tx, rx) = oneshot::channel::<Result<Value, Value>>();
            //
            // dbg!("here");
            //
            // incoming_tx
            //     .send(Event::Request(method, params.try_into().unwrap(), tx))
            //     .await
            //     .unwrap();
            //
            // tokio::spawn({
            //     let outgoing_tx = outgoing_tx.clone();
            //
            //     async move {
            //         match rx.await.unwrap() {
            //             Ok(result) => {
            //                 let _ = outgoing_tx
            //                     .send(Message::Response {
            //                         id,
            //                         error: Value::Nil,
            //                         result,
            //                     })
            //                     .await;
            //             }
            //             Err(error) => {
            //                 let _ = outgoing_tx
            //                     .send(Message::Response {
            //                         id,
            //                         error,
            //                         result: Value::Nil,
            //                     })
            //                     .await;
            //             }
            //         };
            //     }
            // });
        }
        Message::Response { .. } => {
            state.notify(message).await;
            // inner.find_sender(message).await;
        }
    }
}