spotify_cli/rpc/
server.rs

1//! Unix socket server for JSON-RPC
2//!
3//! Handles client connections, parses JSON-RPC requests, and dispatches to handlers.
4
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::{UnixListener, UnixStream};
10use tokio::sync::broadcast;
11use tracing::{debug, error, info, warn};
12
13use crate::storage::paths;
14
15use super::dispatch::Dispatcher;
16use super::protocol::{RpcNotification, RpcRequest, RpcResponse, error_codes};
17
18/// RPC Server configuration
19pub struct ServerConfig {
20    pub socket_path: PathBuf,
21}
22
23impl Default for ServerConfig {
24    fn default() -> Self {
25        let socket_path =
26            paths::socket_file().unwrap_or_else(|_| PathBuf::from("/tmp/spotify-cli.sock"));
27
28        Self { socket_path }
29    }
30}
31
32/// RPC Server
33pub struct Server {
34    config: ServerConfig,
35    dispatcher: Arc<Dispatcher>,
36    event_tx: broadcast::Sender<RpcNotification>,
37}
38
39impl Server {
40    pub fn new(config: ServerConfig) -> Self {
41        let (event_tx, _) = broadcast::channel(100);
42
43        Self {
44            config,
45            dispatcher: Arc::new(Dispatcher::new()),
46            event_tx,
47        }
48    }
49
50    /// Get the socket path
51    pub fn socket_path(&self) -> &PathBuf {
52        &self.config.socket_path
53    }
54
55    /// Get event broadcaster for sending notifications
56    pub fn event_sender(&self) -> broadcast::Sender<RpcNotification> {
57        self.event_tx.clone()
58    }
59
60    /// Run the server
61    pub async fn run(&self) -> std::io::Result<()> {
62        // Remove existing socket file
63        if self.config.socket_path.exists() {
64            std::fs::remove_file(&self.config.socket_path)?;
65        }
66
67        let listener = UnixListener::bind(&self.config.socket_path)?;
68        info!(path = %self.config.socket_path.display(), "RPC server listening");
69
70        loop {
71            match listener.accept().await {
72                Ok((stream, _)) => {
73                    let dispatcher = Arc::clone(&self.dispatcher);
74                    let event_rx = self.event_tx.subscribe();
75
76                    tokio::spawn(async move {
77                        if let Err(e) = handle_client(stream, dispatcher, event_rx).await {
78                            debug!(error = %e, "Client connection ended");
79                        }
80                    });
81                }
82                Err(e) => {
83                    error!(error = %e, "Failed to accept connection");
84                }
85            }
86        }
87    }
88}
89
90/// Handle a single client connection
91async fn handle_client(
92    stream: UnixStream,
93    dispatcher: Arc<Dispatcher>,
94    mut event_rx: broadcast::Receiver<RpcNotification>,
95) -> std::io::Result<()> {
96    let (reader, mut writer) = stream.into_split();
97    let mut reader = BufReader::new(reader);
98    let mut line = String::new();
99
100    debug!("Client connected");
101
102    loop {
103        line.clear();
104
105        tokio::select! {
106            // Handle incoming requests
107            result = reader.read_line(&mut line) => {
108                match result {
109                    Ok(0) => {
110                        debug!("Client disconnected");
111                        break;
112                    }
113                    Ok(_) => {
114                        let response = process_request(&line, &dispatcher).await;
115                        if let Some(resp) = response {
116                            let json = serde_json::to_string(&resp).unwrap_or_default();
117                            writer.write_all(json.as_bytes()).await?;
118                            writer.write_all(b"\n").await?;
119                            writer.flush().await?;
120                        }
121                    }
122                    Err(e) => {
123                        warn!(error = %e, "Read error");
124                        break;
125                    }
126                }
127            }
128
129            // Forward event notifications to client
130            result = event_rx.recv() => {
131                match result {
132                    Ok(notification) => {
133                        let json = serde_json::to_string(&notification).unwrap_or_default();
134                        if writer.write_all(json.as_bytes()).await.is_err() {
135                            break;
136                        }
137                        if writer.write_all(b"\n").await.is_err() {
138                            break;
139                        }
140                        let _ = writer.flush().await;
141                    }
142                    Err(broadcast::error::RecvError::Lagged(_)) => {
143                        // Client is too slow, skip some events
144                        continue;
145                    }
146                    Err(broadcast::error::RecvError::Closed) => {
147                        break;
148                    }
149                }
150            }
151        }
152    }
153
154    Ok(())
155}
156
157/// Process a single JSON-RPC request
158async fn process_request(line: &str, dispatcher: &Dispatcher) -> Option<RpcResponse> {
159    let line = line.trim();
160    if line.is_empty() {
161        return None;
162    }
163
164    // Parse the request
165    let request: RpcRequest = match serde_json::from_str(line) {
166        Ok(req) => req,
167        Err(e) => {
168            return Some(RpcResponse::error(
169                serde_json::Value::Null,
170                error_codes::PARSE_ERROR,
171                &format!("Parse error: {}", e),
172                None,
173            ));
174        }
175    };
176
177    // Notifications don't get responses
178    if request.is_notification() {
179        let _ = dispatcher.dispatch(&request).await;
180        return None;
181    }
182
183    // Get the request id
184    let id = request.id.clone().unwrap_or(serde_json::Value::Null);
185
186    // Dispatch and return response
187    let response = dispatcher.dispatch(&request).await;
188    Some(RpcResponse::from_response(id, response))
189}
190
191impl Drop for Server {
192    fn drop(&mut self) {
193        // Clean up socket file
194        if self.config.socket_path.exists() {
195            let _ = std::fs::remove_file(&self.config.socket_path);
196        }
197    }
198}