ncspot 1.3.4

ncurses Spotify client written in Rust using librespot, inspired by ncmpc and the likes.
Documentation
use std::{io, path::PathBuf};

use futures::SinkExt;
use log::{debug, error, info};
use tokio::net::{UnixListener, UnixStream};
use tokio::runtime::Handle;
use tokio::sync::watch::{Receiver, Sender};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};

use crate::events::{Event, EventManager};
use crate::model::playable::Playable;
use crate::spotify::PlayerEvent;

pub struct IpcSocket {
    tx: Sender<Status>,
    path: PathBuf,
}

#[derive(Clone, Debug, Serialize)]
struct Status {
    mode: PlayerEvent,
    playable: Option<Playable>,
}

impl Drop for IpcSocket {
    fn drop(&mut self) {
        self.try_remove_socket();
    }
}

impl IpcSocket {
    pub fn new(handle: &Handle, path: PathBuf, ev: EventManager) -> io::Result<Self> {
        let path = if path.exists() && Self::is_open_socket(&path) {
            let mut new_path = path;
            new_path.set_file_name(format!("ncspot.{}.sock", std::process::id()));
            new_path
        } else if path.exists() && !Self::is_open_socket(&path) {
            std::fs::remove_file(&path)?;
            path
        } else {
            path
        };

        info!("Creating IPC domain socket at {path:?}");

        let status = Status {
            mode: PlayerEvent::Stopped,
            playable: None,
        };

        let (tx, rx) = tokio::sync::watch::channel(status);
        let listener_path = path.clone();
        handle.spawn(async move {
            let listener =
                UnixListener::bind(listener_path).expect("Could not create IPC domain socket");
            Self::worker(listener, ev, rx.clone()).await;
        });

        Ok(Self { tx, path })
    }

    fn is_open_socket(path: &PathBuf) -> bool {
        std::os::unix::net::UnixStream::connect(path).is_ok()
    }

    pub fn publish(&self, event: &PlayerEvent, playable: Option<Playable>) {
        let status = Status {
            mode: event.clone(),
            playable,
        };
        self.tx.send(status).unwrap();
    }

    async fn worker(listener: UnixListener, ev: EventManager, tx: Receiver<Status>) {
        loop {
            match listener.accept().await {
                Ok((stream, sockaddr)) => {
                    debug!("Connection from {sockaddr:?}");
                    tokio::spawn(Self::stream_handler(
                        stream,
                        ev.clone(),
                        WatchStream::new(tx.clone()),
                    ));
                }
                Err(e) => error!("Error accepting connection: {e}"),
            }
        }
    }

    async fn stream_handler(
        mut stream: UnixStream,
        ev: EventManager,
        mut rx: WatchStream<Status>,
    ) -> Result<(), String> {
        let (reader, writer) = stream.split();
        let mut framed_reader = FramedRead::new(reader, LinesCodec::new());
        let mut framed_writer = FramedWrite::new(writer, LinesCodec::new());

        loop {
            tokio::select! {
                line = framed_reader.next() => {
                    match line {
                        Some(Ok(line)) => {
                            debug!("Received line: \"{line}\"");
                            ev.send(Event::IpcInput(line));
                        }
                        Some(Err(e)) => error!("Error reading line: {e}"),
                        None => {
                            debug!("Closing IPC connection");
                            return Ok(())
                        }
                    }
                }
                Some(status) = rx.next() => {
                    debug!("IPC Status update: {status:?}");
                    let status_str = serde_json::to_string(&status).map_err(|e| e.to_string())?;
                    framed_writer.send(status_str).await.map_err(|e| e.to_string())?;
                }
                else => {
                    error!("All streams are closed");
                    return Ok(())
                }
            }
        }
    }

    /// Try to remove the IPC socket if there is one for this instance of `ncspot`. Don't do
    /// anything if the socket has already been removed for some reason.
    fn try_remove_socket(&mut self) {
        if std::fs::remove_file(&self.path).is_ok() {
            info!("removed socket at {:?}", self.path);
        } else {
            info!("socket already removed");
        }
    }
}