cord-nvim 2.0.0-beta.1

🚀 The most extensible Discord Rich Presence plugin for Neovim, powered by Rust.
use std::io;
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::JoinHandle;

use super::client::PipeClient;
use crate::ipc::pipe::{PipeClientImpl, PipeServerImpl};
use crate::messages::events::local::ErrorEvent;
use crate::messages::message::Message;
use crate::session::SessionManager;
use crate::{client_event, echo, local_event};

pub struct PipeServer {
    session_manager: Arc<SessionManager>,
    pipe_name: String,
    tx: Sender<Message>,
    next_client_id: Arc<AtomicU32>,
    running: Arc<AtomicBool>,
    listener: Option<UnixListener>,
    thread_handle: Option<JoinHandle<()>>,
}

impl PipeServerImpl for PipeServer {
    fn new(
        pipe_name: &str,
        tx: Sender<Message>,
        session_manager: Arc<SessionManager>,
    ) -> Self {
        Self {
            session_manager,
            pipe_name: pipe_name.to_string(),
            tx,
            next_client_id: Arc::new(AtomicU32::new(0)),
            running: Arc::new(AtomicBool::new(false)),
            listener: None,
            thread_handle: None,
        }
    }

    fn start(&mut self) -> io::Result<()> {
        if self.running.load(Ordering::SeqCst) {
            return Ok(());
        }

        if Path::new(&self.pipe_name).exists() {
            std::fs::remove_file(&self.pipe_name)?;
        }

        let listener = UnixListener::bind(&self.pipe_name)?;
        self.listener = Some(listener);
        self.running.store(true, Ordering::SeqCst);

        let tx = self.tx.clone();
        let session_manager = Arc::clone(&self.session_manager);
        let next_client_id = Arc::clone(&self.next_client_id);
        let running = Arc::clone(&self.running);
        let listener = self.listener.as_ref().unwrap().try_clone()?;

        self.thread_handle = Some(std::thread::spawn(move || {
            let mut notified = false;
            while running.load(Ordering::SeqCst) {
                if !notified {
                    echo!("Ready");
                    notified = true;
                }

                match listener.accept() {
                    Ok((stream, _)) => {
                        let client_id =
                            next_client_id.fetch_add(1, Ordering::SeqCst);
                        let mut client =
                            PipeClient::new(client_id, stream, tx.clone());
                        client.start_read_thread().ok();
                        session_manager.create_session(client_id, client);
                        tx.send(client_event!(client_id, Connect)).ok();
                    }
                    Err(e) => {
                        tx.send(local_event!(
                            0,
                            Error,
                            ErrorEvent::new(Box::new(e))
                        ))
                        .ok();
                    }
                }
            }
        }));

        Ok(())
    }

    fn stop(&mut self) {
        self.running.store(false, Ordering::SeqCst);
        if let Some(handle) = self.thread_handle.take() {
            drop(handle);
        }
        if let Some(listener) = self.listener.take() {
            drop(listener);
        }
        let _ = std::fs::remove_file(&self.pipe_name);
    }

    fn broadcast(&self, data: &[u8]) -> io::Result<()> {
        let mut sessions = self.session_manager.sessions.write().unwrap();
        for session in sessions.values_mut() {
            if let Some(client) = session.get_pipe_client_mut() {
                client.write(data)?;
            }
        }
        Ok(())
    }

    fn write_to(&self, client_id: u32, data: &[u8]) -> io::Result<()> {
        let mut sessions = self.session_manager.sessions.write().unwrap();
        if let Some(session) = sessions.get_mut(&client_id) {
            if let Some(client) = session.get_pipe_client_mut() {
                return client.write(data);
            }
        }
        Err(io::Error::new(io::ErrorKind::NotFound, "Client not found"))
    }

    fn disconnect(&self, client_id: u32) -> io::Result<()> {
        self.session_manager.remove_session(client_id);
        Ok(())
    }
}

impl Drop for PipeServer {
    fn drop(&mut self) {
        self.stop();
    }
}