cord-nvim 2.0.0-beta.1

🚀 The most extensible Discord Rich Presence plugin for Neovim, powered by Rust.
#![allow(clippy::upper_case_acronyms)]

use std::fs::File;
use std::io;
use std::os::windows::io::FromRawHandle;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::JoinHandle;

use super::client::PipeClient;
use crate::ipc::bindings::{
    CloseHandle, ConnectNamedPipe, CreateEventW, CreateNamedPipeW,
    GetLastError, GetOverlappedResult, Overlapped, ERROR_IO_PENDING,
    ERROR_PIPE_CONNECTED, FILE_FLAG_OVERLAPPED, HANDLE, INVALID_HANDLE_VALUE,
    PIPE_ACCESS_DUPLEX, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE,
    PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
};
use crate::ipc::pipe::{PipeClientImpl, PipeServerImpl};
use crate::messages::events::local::ErrorEvent;
use crate::messages::message::Message;
use crate::session::SessionManager;
use crate::{client_event, echo, local_event};

pub struct PipeServer {
    session_manager: Arc<SessionManager>,
    pipe_name: String,
    tx: Sender<Message>,
    next_client_id: Arc<AtomicU32>,
    running: Arc<AtomicBool>,
    thread_handle: Option<JoinHandle<()>>,
}

impl PipeServerImpl for PipeServer {
    fn new(
        pipe_name: &str,
        tx: Sender<Message>,
        session_manager: Arc<SessionManager>,
    ) -> Self {
        Self {
            session_manager,
            pipe_name: pipe_name.to_string(),
            tx,
            next_client_id: Arc::new(AtomicU32::new(0)),
            running: Arc::new(AtomicBool::new(false)),
            thread_handle: None,
        }
    }

    fn start(&mut self) -> io::Result<()> {
        if self.running.swap(true, Ordering::SeqCst) {
            return Ok(());
        }

        let pipe_name = self.pipe_name.clone();
        let session_manager = Arc::clone(&self.session_manager);
        let next_client_id = Arc::clone(&self.next_client_id);
        let running = Arc::clone(&self.running);
        let tx = self.tx.clone();

        self.thread_handle = Some(std::thread::spawn(move || {
            let mut notified = false;
            while running.load(Ordering::SeqCst) {
                if let Ok(handle) = PipeServer::create_pipe_instance(&pipe_name)
                {
                    unsafe {
                        let h_event = CreateEventW(
                            std::ptr::null_mut(),
                            1,
                            0,
                            std::ptr::null_mut(),
                        );
                        if h_event.is_null() {
                            CloseHandle(handle);
                            tx.send(local_event!(
                                0,
                                Error,
                                ErrorEvent::new(Box::new(
                                    io::Error::last_os_error()
                                ))
                            ))
                            .ok();
                            continue;
                        }

                        let mut overlapped = Overlapped {
                            internal: 0,
                            internal_high: 0,
                            offset: 0,
                            offset_high: 0,
                            h_event,
                        };

                        let connect_result =
                            ConnectNamedPipe(handle, &mut overlapped);
                        if connect_result == 0 {
                            let error = GetLastError();
                            if error != ERROR_IO_PENDING
                                && error != ERROR_PIPE_CONNECTED
                            {
                                CloseHandle(handle);
                                CloseHandle(h_event);
                                tx.send(local_event!(
                                    0,
                                    Error,
                                    ErrorEvent::new(Box::new(
                                        io::Error::from_raw_os_error(
                                            error as _,
                                        )
                                    ))
                                ))
                                .ok();
                                continue;
                            }
                        }

                        if !notified {
                            echo!("Ready");
                            notified = true;
                        }

                        let mut bytes_transferred = 0;
                        if GetOverlappedResult(
                            handle,
                            &mut overlapped,
                            &mut bytes_transferred,
                            1,
                        ) == 0
                        {
                            let error = GetLastError();
                            CloseHandle(handle);
                            CloseHandle(h_event);
                            tx.send(local_event!(
                                0,
                                Error,
                                ErrorEvent::new(Box::new(
                                    io::Error::from_raw_os_error(error as _)
                                ))
                            ))
                            .ok();
                            continue;
                        }

                        let client_id =
                            next_client_id.fetch_add(1, Ordering::SeqCst);
                        let mut client = PipeClient::new(
                            client_id,
                            File::from_raw_handle(handle as _),
                            tx.clone(),
                        );
                        client.start_read_thread().ok();
                        session_manager.create_session(client_id, client);
                        tx.send(client_event!(client_id, Connect)).ok();

                        CloseHandle(h_event);
                    }
                }
            }
        }));

        Ok(())
    }

    fn stop(&mut self) {
        self.running.store(false, Ordering::SeqCst);
        if let Some(handle) = self.thread_handle.take() {
            drop(handle);
        }
    }

    fn broadcast(&self, data: &[u8]) -> io::Result<()> {
        let mut sessions = self.session_manager.sessions.write().unwrap();
        for session in sessions.values_mut() {
            if let Some(client) = session.get_pipe_client_mut() {
                client.write(data)?;
            }
        }
        Ok(())
    }

    fn write_to(&self, client_id: u32, data: &[u8]) -> io::Result<()> {
        let mut sessions = self.session_manager.sessions.write().unwrap();
        if let Some(session) = sessions.get_mut(&client_id) {
            if let Some(client) = session.get_pipe_client_mut() {
                return client.write(data);
            }
        }
        Err(io::Error::new(io::ErrorKind::NotFound, "Client not found"))
    }

    fn disconnect(&self, client_id: u32) -> io::Result<()> {
        self.session_manager.remove_session(client_id);
        Ok(())
    }
}

impl PipeServer {
    fn create_pipe_instance(pipe_name: &str) -> io::Result<HANDLE> {
        let wide_name: Vec<u16> =
            pipe_name.encode_utf16().chain(std::iter::once(0)).collect();

        let handle = unsafe {
            CreateNamedPipeW(
                wide_name.as_ptr(),
                PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
                PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
                PIPE_UNLIMITED_INSTANCES,
                1024 * 16,
                1024 * 16,
                0,
                std::ptr::null_mut(),
            )
        };

        if handle == INVALID_HANDLE_VALUE {
            Err(io::Error::last_os_error())
        } else {
            Ok(handle)
        }
    }
}

impl Drop for PipeServer {
    fn drop(&mut self) {
        self.stop();
    }
}