spotify_cli/rpc/
server.rs1use std::path::PathBuf;
6use std::sync::Arc;
7
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::{UnixListener, UnixStream};
10use tokio::sync::broadcast;
11use tracing::{debug, error, info, warn};
12
13use crate::storage::paths;
14
15use super::dispatch::Dispatcher;
16use super::protocol::{RpcNotification, RpcRequest, RpcResponse, error_codes};
17
18pub struct ServerConfig {
20 pub socket_path: PathBuf,
21}
22
23impl Default for ServerConfig {
24 fn default() -> Self {
25 let socket_path =
26 paths::socket_file().unwrap_or_else(|_| PathBuf::from("/tmp/spotify-cli.sock"));
27
28 Self { socket_path }
29 }
30}
31
32pub struct Server {
34 config: ServerConfig,
35 dispatcher: Arc<Dispatcher>,
36 event_tx: broadcast::Sender<RpcNotification>,
37}
38
39impl Server {
40 pub fn new(config: ServerConfig) -> Self {
41 let (event_tx, _) = broadcast::channel(100);
42
43 Self {
44 config,
45 dispatcher: Arc::new(Dispatcher::new()),
46 event_tx,
47 }
48 }
49
50 pub fn socket_path(&self) -> &PathBuf {
52 &self.config.socket_path
53 }
54
55 pub fn event_sender(&self) -> broadcast::Sender<RpcNotification> {
57 self.event_tx.clone()
58 }
59
60 pub async fn run(&self) -> std::io::Result<()> {
62 if self.config.socket_path.exists() {
64 std::fs::remove_file(&self.config.socket_path)?;
65 }
66
67 let listener = UnixListener::bind(&self.config.socket_path)?;
68 info!(path = %self.config.socket_path.display(), "RPC server listening");
69
70 loop {
71 match listener.accept().await {
72 Ok((stream, _)) => {
73 let dispatcher = Arc::clone(&self.dispatcher);
74 let event_rx = self.event_tx.subscribe();
75
76 tokio::spawn(async move {
77 if let Err(e) = handle_client(stream, dispatcher, event_rx).await {
78 debug!(error = %e, "Client connection ended");
79 }
80 });
81 }
82 Err(e) => {
83 error!(error = %e, "Failed to accept connection");
84 }
85 }
86 }
87 }
88}
89
90async fn handle_client(
92 stream: UnixStream,
93 dispatcher: Arc<Dispatcher>,
94 mut event_rx: broadcast::Receiver<RpcNotification>,
95) -> std::io::Result<()> {
96 let (reader, mut writer) = stream.into_split();
97 let mut reader = BufReader::new(reader);
98 let mut line = String::new();
99
100 debug!("Client connected");
101
102 loop {
103 line.clear();
104
105 tokio::select! {
106 result = reader.read_line(&mut line) => {
108 match result {
109 Ok(0) => {
110 debug!("Client disconnected");
111 break;
112 }
113 Ok(_) => {
114 let response = process_request(&line, &dispatcher).await;
115 if let Some(resp) = response {
116 let json = serde_json::to_string(&resp).unwrap_or_default();
117 writer.write_all(json.as_bytes()).await?;
118 writer.write_all(b"\n").await?;
119 writer.flush().await?;
120 }
121 }
122 Err(e) => {
123 warn!(error = %e, "Read error");
124 break;
125 }
126 }
127 }
128
129 result = event_rx.recv() => {
131 match result {
132 Ok(notification) => {
133 let json = serde_json::to_string(¬ification).unwrap_or_default();
134 if writer.write_all(json.as_bytes()).await.is_err() {
135 break;
136 }
137 if writer.write_all(b"\n").await.is_err() {
138 break;
139 }
140 let _ = writer.flush().await;
141 }
142 Err(broadcast::error::RecvError::Lagged(_)) => {
143 continue;
145 }
146 Err(broadcast::error::RecvError::Closed) => {
147 break;
148 }
149 }
150 }
151 }
152 }
153
154 Ok(())
155}
156
157async fn process_request(line: &str, dispatcher: &Dispatcher) -> Option<RpcResponse> {
159 let line = line.trim();
160 if line.is_empty() {
161 return None;
162 }
163
164 let request: RpcRequest = match serde_json::from_str(line) {
166 Ok(req) => req,
167 Err(e) => {
168 return Some(RpcResponse::error(
169 serde_json::Value::Null,
170 error_codes::PARSE_ERROR,
171 &format!("Parse error: {}", e),
172 None,
173 ));
174 }
175 };
176
177 if request.is_notification() {
179 let _ = dispatcher.dispatch(&request).await;
180 return None;
181 }
182
183 let id = request.id.clone().unwrap_or(serde_json::Value::Null);
185
186 let response = dispatcher.dispatch(&request).await;
188 Some(RpcResponse::from_response(id, response))
189}
190
191impl Drop for Server {
192 fn drop(&mut self) {
193 if self.config.socket_path.exists() {
195 let _ = std::fs::remove_file(&self.config.socket_path);
196 }
197 }
198}