mpv_ipc/
lib.rs

1use anyhow::{anyhow, bail, Context};
2use log::{debug, info, trace, warn};
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use serde_json::json;
6use std::borrow::{BorrowMut, Cow};
7use std::collections::HashMap;
8use std::fmt::Display;
9use std::future::Future;
10use std::path::PathBuf;
11use std::process::Stdio;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime};
14use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader, Lines, WriteHalf};
15use tokio::process::Child;
16use tokio::sync::{mpsc, oneshot, watch, Mutex};
17use tokio::task::JoinHandle;
18use tokio::{process, time};
19use tokio_util::sync::CancellationToken;
20
21fn unix_timestamp() -> u64 {
22    SystemTime::now()
23        .duration_since(SystemTime::UNIX_EPOCH)
24        .unwrap()
25        .as_secs()
26}
27
28#[cfg(target_os = "windows")]
29mod mpv_platform {
30    use crate::unix_timestamp;
31    use std::path::PathBuf;
32    use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient};
33    pub type Stream = NamedPipeClient;
34    pub async fn connect(path: &PathBuf) -> Result<Stream, ()> {
35        let opts = ClientOptions::new();
36        opts.open(path).or(Err(()))
37    }
38    pub fn generate_ipc_path() -> PathBuf {
39        format!("\\\\.\\pipe\\mpv_ipc_{}", unix_timestamp()).into()
40    }
41    pub fn default_mpv_bin() -> PathBuf {
42        "mpv.exe".into()
43    }
44}
45#[cfg(not(target_os = "windows"))]
46mod mpv_platform {
47    use crate::unix_timestamp;
48    use std::path::PathBuf;
49    use tokio::net::UnixStream;
50    pub type Stream = UnixStream;
51    pub async fn connect(path: &PathBuf) -> Result<Stream, ()> {
52        UnixStream::connect(&path).await.or(Err(()))
53    }
54    pub fn generate_ipc_path() -> PathBuf {
55        let dir = std::env::temp_dir();
56        dir.join(format!("mpv_ipc_{}.sock", unix_timestamp()))
57    }
58    pub fn default_mpv_bin() -> PathBuf {
59        "mpv".into()
60    }
61}
62
63#[derive(Serialize, Deserialize)]
64struct MpvCommand {
65    request_id: usize,
66    command: serde_json::Value,
67}
68
69#[derive(Serialize, Deserialize, Debug)]
70struct MpvResponse {
71    request_id: usize,
72    data: MpvDataOption,
73    error: String,
74}
75
76type LockedMpvIdMap<T> = Arc<Mutex<HashMap<usize, T>>>;
77type MpvDataOption = Option<serde_json::Value>;
78
79#[derive(Clone)]
80pub struct MpvSpawnOptions {
81    pub mpv_path: Option<PathBuf>,
82    pub ipc_path: Option<PathBuf>,
83    pub config_dir: Option<PathBuf>,
84    pub inherit_stdout: bool,
85}
86impl Default for MpvSpawnOptions {
87    fn default() -> Self {
88        Self {
89            mpv_path: None,
90            ipc_path: None,
91            config_dir: None,
92            inherit_stdout: false,
93        }
94    }
95}
96
97pub struct MpvIpc {
98    shutdown: CancellationToken,
99    writer: WriteHalf<mpv_platform::Stream>,
100    request_id: usize,
101    requests: LockedMpvIdMap<oneshot::Sender<anyhow::Result<serde_json::Value>>>,
102    event_handlers: Arc<Mutex<HashMap<String, Vec<mpsc::Sender<serde_json::Value>>>>>,
103    observers: LockedMpvIdMap<mpsc::Sender<MpvDataOption>>,
104    tasks: Vec<JoinHandle<()>>,
105    child: Option<Child>,
106}
107impl MpvIpc {
108    /// Attach to an existing mpv IPC socket.
109    pub async fn connect(ipc_path: &PathBuf) -> anyhow::Result<Self> {
110        // Retry before giving up
111        let (mut line_reader, writer): (Lines<_>, WriteHalf<_>) = async {
112            for n in 0..10 {
113                if n > 0 {
114                    time::sleep(Duration::from_millis(100) * n).await;
115                }
116                if let Ok(stream) = mpv_platform::connect(ipc_path).await {
117                    debug!("Connected to mpv socket");
118                    let (reader, writer) = io::split(stream);
119                    let line_reader = BufReader::new(reader).lines();
120                    return Ok((line_reader, writer));
121                }
122            }
123            bail!("failed to connect to mpv socket");
124        }
125        .await?;
126
127        let requests = Arc::new(Mutex::new(HashMap::<
128            usize,
129            oneshot::Sender<anyhow::Result<serde_json::Value>>,
130        >::new()));
131        let observers = Arc::new(Mutex::new(HashMap::<usize, mpsc::Sender<MpvDataOption>>::new()));
132        let event_handlers = Arc::new(Mutex::new(
133            HashMap::<String, Vec<mpsc::Sender<serde_json::Value>>>::new(),
134        ));
135        let shutdown = CancellationToken::new();
136
137        let shutdown_ref = shutdown.clone();
138        let requests_ref = requests.clone();
139        let observers_ref = observers.clone();
140        let event_handlers_ref = event_handlers.clone();
141        let mpv_ipc_task = tokio::spawn(async move {
142            loop {
143                let res = tokio::select! {
144                    line = line_reader.next_line() => { line },
145                    _ = shutdown_ref.cancelled() => {
146                        trace!("Shutdown cancellation. Breaking main loop.");
147                        break;
148                    }
149                };
150                let Ok(Some(str)) = res else {
151                    warn!("Failed to read from mpv IPC. Assuming mpv shutdown.");
152                    shutdown_ref.cancel();
153                    // TODO: this should also abort tasks etc
154
155                    // Send faked shutdown event to any listeners
156                    if let Some(list) = event_handlers_ref.lock().await.get("shutdown") {
157                        for handler in list {
158                            handler.send(json!({"event": "shutdown"})).await.unwrap();
159                        }
160                    }
161                    break; // stop main loop
162                };
163
164                trace!("<-mpv: {}", str);
165                let json = serde_json::from_str::<serde_json::Value>(str.as_str()).unwrap();
166                if let Ok(mpv_resp) = MpvResponse::deserialize(&json) {
167                    if let Some(tx) = requests_ref.lock().await.remove(&mpv_resp.request_id) {
168                        if mpv_resp.error == "success" {
169                            tx.send(Ok(mpv_resp.data.unwrap_or(serde_json::Value::Null))).unwrap();
170                        } else {
171                            tx.send(Err(anyhow!(mpv_resp.error))).unwrap();
172                        }
173                    } else {
174                        warn!("Unhandled requests ID: {}", mpv_resp.request_id);
175                    }
176                } else if let Some(event) = json.as_object().and_then(|j| j.get("event")).and_then(|j| j.as_str()) {
177                    trace!("Event '{}'", event);
178                    if let Some(list) = event_handlers_ref.lock().await.get(event) {
179                        for handler in list {
180                            handler.send(json.clone()).await.unwrap();
181                        }
182                    }
183                    if event == "property-change" {
184                        let id = json.as_object().unwrap().get("id").unwrap().as_u64().unwrap() as usize;
185                        if let Some(tx) = observers_ref.lock().await.get(&id) {
186                            let data = json.as_object().unwrap().get("data").map(|d| d.to_owned());
187                            tx.send(data).await.unwrap();
188                        } else {
189                            warn!("Unhandled observable ID: {}", id);
190                        }
191                    }
192                    if event == "shutdown" {
193                        info!("Received mpv 'shutdown' event.");
194                        shutdown_ref.cancel();
195                        // TODO: this should also abort tasks etc
196                        break; // stop main loop
197                    }
198                } else {
199                    warn!("Unhandled mpv message: {}", str);
200                }
201            }
202        });
203
204        Ok(Self {
205            shutdown,
206            writer,
207            request_id: 0,
208            requests,
209            observers,
210            event_handlers,
211            tasks: vec![mpv_ipc_task],
212            child: None,
213        })
214    }
215    /// Spawn a new mpv process and attach to it.
216    pub async fn spawn(opt: &MpvSpawnOptions) -> anyhow::Result<Self> {
217        let mpv_path = opt
218            .mpv_path
219            .as_ref()
220            .map(|v| Cow::Borrowed(v))
221            .unwrap_or_else(|| Cow::Owned(mpv_platform::default_mpv_bin()));
222        let ipc_path = opt
223            .ipc_path
224            .as_ref()
225            .map(|v| Cow::Borrowed(v))
226            .unwrap_or_else(|| Cow::Owned(mpv_platform::generate_ipc_path()));
227        let mut args = vec![
228            "--idle".to_owned(),
229            "--input-ipc-server=".to_owned() + &ipc_path.to_string_lossy(),
230        ];
231        if let Some(config_dir) = &opt.config_dir {
232            args.push("--config-dir=".to_owned() + &config_dir.to_string_lossy());
233        }
234        let stdout_mode = || {
235            if opt.inherit_stdout {
236                Stdio::inherit()
237            } else {
238                Stdio::null()
239            }
240        };
241        let child = process::Command::new(mpv_path.as_ref())
242            .args(args)
243            .stdin(Stdio::null())
244            .stdout(stdout_mode())
245            .stderr(stdout_mode())
246            .spawn()
247            .context("Failed to spawn mpv process")?;
248        let child_pid = child.id().unwrap();
249        info!("mpv spawned! pid: {}", child_pid);
250
251        // Connect
252        let mut sself = Self::connect(&ipc_path).await?;
253        sself.child = Some(child);
254
255        // Sanity check
256        let ipc_pid = sself.get_prop::<u32>("pid").await?;
257        if ipc_pid != child_pid {
258            warn!("mpv process pid and mpv ipc pid don't match");
259        }
260
261        Ok(sself)
262    }
263    pub async fn running(&self) -> bool {
264        !self.shutdown.is_cancelled()
265    }
266    /// Send a command to mpv and wait for a reply.
267    /// This should not be used to `quit` because it will never receive a reply. Use the `quit` function instead.
268    pub async fn send_command(&mut self, cmd: serde_json::Value) -> anyhow::Result<serde_json::Value> {
269        if self.shutdown.is_cancelled() {
270            bail!("mpv instance has shut down");
271        }
272        let (tx, rx) = oneshot::channel::<anyhow::Result<serde_json::Value>>();
273        self.request_id += 1;
274        self.requests.lock().await.insert(self.request_id, tx);
275        let str = serde_json::to_string(&MpvCommand {
276            request_id: self.request_id,
277            command: cmd,
278        })
279        .unwrap();
280        trace!("->mpv: {}", str);
281        self.writer.write_all((str + "\n").as_bytes()).await?;
282        tokio::select! {
283            result = rx => result?,
284            _ = self.shutdown.cancelled() => bail!("mpv shutdown"),
285        }
286    }
287    fn abort_tasks(&mut self) {
288        for handle in &self.tasks {
289            handle.abort();
290        }
291        self.tasks.clear();
292    }
293    /// Shuts down the mpv player and disconnects.
294    pub async fn quit(&mut self) {
295        self.abort_tasks();
296        let quit_fut = self.writer.write_all(("{\"command\":[\"quit\"]}\n").as_bytes());
297        _ = tokio::time::timeout(Duration::from_secs(2), quit_fut).await;
298        _ = self.writer.shutdown().await;
299        if let Some(child) = &mut self.child {
300            _ = child.kill();
301        }
302        self.shutdown.cancel();
303    }
304    /// Disconnect from the IPC socket.
305    pub async fn disconnect(&mut self) {
306        self.abort_tasks();
307        _ = self.writer.shutdown().await;
308        self.shutdown.cancel();
309    }
310    pub async fn get_prop<T: DeserializeOwned>(&mut self, name: &str) -> anyhow::Result<T> {
311        self.send_command(json!(["get_property", name]))
312            .await
313            .and_then(|json| T::deserialize(json).map_err(|_| anyhow!("failed to deserialize prop")))
314    }
315    pub async fn set_prop(&mut self, name: &str, value: impl Serialize) -> anyhow::Result<()> {
316        self.send_command(json!(["set_property", name, value]))
317            .await
318            .map(|_| ())
319    }
320    pub async fn watch_event<A, F, Fut>(
321        &mut self,
322        name: impl AsRef<str> + 'static + Send + Sync + Serialize + Display,
323        callback: F,
324    ) where
325        for<'a> Fut: Future<Output = A> + Send + 'a,
326        for<'a> F: (Fn(serde_json::Value) -> Fut) + Send + 'a,
327    {
328        let (json_tx, mut json_rx) = mpsc::channel::<serde_json::Value>(1);
329        let enable = {
330            let mut event_handlers = self.borrow_mut().event_handlers.lock().await;
331            if let Some(list) = event_handlers.get_mut(name.as_ref()) {
332                list.push(json_tx);
333                false
334            } else {
335                _ = event_handlers.insert(name.to_string(), vec![json_tx]);
336                true
337            }
338        };
339        if enable {
340            self.send_command(json!(["enable_event", name])).await.unwrap();
341        }
342        self.tasks.push(tokio::spawn(async move {
343            loop {
344                let json = json_rx.recv().await.unwrap();
345                trace!("Got watched event value '{}': {:?}", name, json);
346                callback(json).await;
347            }
348        }));
349    }
350    pub async fn observe_prop<T: 'static + Send + Sync + Clone + DeserializeOwned>(
351        &mut self,
352        name: impl AsRef<str> + 'static + Send + Sync + Serialize + Display,
353        default: T,
354    ) -> watch::Receiver<T> {
355        // Create observer
356        self.request_id += 1;
357        let id = self.request_id;
358        let (json_tx, mut json_rx) = mpsc::channel::<MpvDataOption>(10);
359        self.observers.lock().await.insert(id, json_tx);
360        self.send_command(json!(["observe_property", id, name])).await.unwrap();
361
362        // Create converter
363        let init_val = self.get_prop(name.as_ref()).await.unwrap_or_else(|_| default.clone());
364        let (t_tx, t_rx) = watch::channel::<T>(init_val);
365        self.tasks.push(tokio::spawn(async move {
366            loop {
367                if let Some(json) = json_rx.recv().await.unwrap() {
368                    trace!("Got observed value '{}': {}", name, json);
369                    if let Ok(val) = T::deserialize(&json) {
370                        _ = t_tx.send(val);
371                    } else {
372                        warn!("Failed to deserialize observable '{}'. Using default.", name);
373                        _ = t_tx.send(default.clone());
374                    }
375                } else {
376                    debug!("Observable '{}' updated without a value. Using default.", name);
377                    _ = t_tx.send(default.clone());
378                }
379            }
380        }));
381        t_rx
382    }
383}
384impl Drop for MpvIpc {
385    fn drop(&mut self) {
386        tokio::task::block_in_place(move || {
387            tokio::runtime::Handle::current().block_on(async {
388                if self.child.is_some() {
389                    self.quit().await;
390                } else {
391                    self.disconnect().await;
392                }
393            });
394        });
395    }
396}