1use anyhow::{anyhow, bail, Context};
2use log::{debug, info, trace, warn};
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use serde_json::json;
6use std::borrow::{BorrowMut, Cow};
7use std::collections::HashMap;
8use std::fmt::Display;
9use std::future::Future;
10use std::path::PathBuf;
11use std::process::Stdio;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime};
14use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader, Lines, WriteHalf};
15use tokio::process::Child;
16use tokio::sync::{mpsc, oneshot, watch, Mutex};
17use tokio::task::JoinHandle;
18use tokio::{process, time};
19use tokio_util::sync::CancellationToken;
20
21fn unix_timestamp() -> u64 {
22 SystemTime::now()
23 .duration_since(SystemTime::UNIX_EPOCH)
24 .unwrap()
25 .as_secs()
26}
27
28#[cfg(target_os = "windows")]
29mod mpv_platform {
30 use crate::unix_timestamp;
31 use std::path::PathBuf;
32 use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient};
33 pub type Stream = NamedPipeClient;
34 pub async fn connect(path: &PathBuf) -> Result<Stream, ()> {
35 let opts = ClientOptions::new();
36 opts.open(path).or(Err(()))
37 }
38 pub fn generate_ipc_path() -> PathBuf {
39 format!("\\\\.\\pipe\\mpv_ipc_{}", unix_timestamp()).into()
40 }
41 pub fn default_mpv_bin() -> PathBuf {
42 "mpv.exe".into()
43 }
44}
45#[cfg(not(target_os = "windows"))]
46mod mpv_platform {
47 use crate::unix_timestamp;
48 use std::path::PathBuf;
49 use tokio::net::UnixStream;
50 pub type Stream = UnixStream;
51 pub async fn connect(path: &PathBuf) -> Result<Stream, ()> {
52 UnixStream::connect(&path).await.or(Err(()))
53 }
54 pub fn generate_ipc_path() -> PathBuf {
55 let dir = std::env::temp_dir();
56 dir.join(format!("mpv_ipc_{}.sock", unix_timestamp()))
57 }
58 pub fn default_mpv_bin() -> PathBuf {
59 "mpv".into()
60 }
61}
62
63#[derive(Serialize, Deserialize)]
64struct MpvCommand {
65 request_id: usize,
66 command: serde_json::Value,
67}
68
69#[derive(Serialize, Deserialize, Debug)]
70struct MpvResponse {
71 request_id: usize,
72 data: MpvDataOption,
73 error: String,
74}
75
76type LockedMpvIdMap<T> = Arc<Mutex<HashMap<usize, T>>>;
77type MpvDataOption = Option<serde_json::Value>;
78
79#[derive(Clone)]
80pub struct MpvSpawnOptions {
81 pub mpv_path: Option<PathBuf>,
82 pub ipc_path: Option<PathBuf>,
83 pub config_dir: Option<PathBuf>,
84 pub inherit_stdout: bool,
85}
86impl Default for MpvSpawnOptions {
87 fn default() -> Self {
88 Self {
89 mpv_path: None,
90 ipc_path: None,
91 config_dir: None,
92 inherit_stdout: false,
93 }
94 }
95}
96
97pub struct MpvIpc {
98 shutdown: CancellationToken,
99 writer: WriteHalf<mpv_platform::Stream>,
100 request_id: usize,
101 requests: LockedMpvIdMap<oneshot::Sender<anyhow::Result<serde_json::Value>>>,
102 event_handlers: Arc<Mutex<HashMap<String, Vec<mpsc::Sender<serde_json::Value>>>>>,
103 observers: LockedMpvIdMap<mpsc::Sender<MpvDataOption>>,
104 tasks: Vec<JoinHandle<()>>,
105 child: Option<Child>,
106}
107impl MpvIpc {
108 pub async fn connect(ipc_path: &PathBuf) -> anyhow::Result<Self> {
110 let (mut line_reader, writer): (Lines<_>, WriteHalf<_>) = async {
112 for n in 0..10 {
113 if n > 0 {
114 time::sleep(Duration::from_millis(100) * n).await;
115 }
116 if let Ok(stream) = mpv_platform::connect(ipc_path).await {
117 debug!("Connected to mpv socket");
118 let (reader, writer) = io::split(stream);
119 let line_reader = BufReader::new(reader).lines();
120 return Ok((line_reader, writer));
121 }
122 }
123 bail!("failed to connect to mpv socket");
124 }
125 .await?;
126
127 let requests = Arc::new(Mutex::new(HashMap::<
128 usize,
129 oneshot::Sender<anyhow::Result<serde_json::Value>>,
130 >::new()));
131 let observers = Arc::new(Mutex::new(HashMap::<usize, mpsc::Sender<MpvDataOption>>::new()));
132 let event_handlers = Arc::new(Mutex::new(
133 HashMap::<String, Vec<mpsc::Sender<serde_json::Value>>>::new(),
134 ));
135 let shutdown = CancellationToken::new();
136
137 let shutdown_ref = shutdown.clone();
138 let requests_ref = requests.clone();
139 let observers_ref = observers.clone();
140 let event_handlers_ref = event_handlers.clone();
141 let mpv_ipc_task = tokio::spawn(async move {
142 loop {
143 let res = tokio::select! {
144 line = line_reader.next_line() => { line },
145 _ = shutdown_ref.cancelled() => {
146 trace!("Shutdown cancellation. Breaking main loop.");
147 break;
148 }
149 };
150 let Ok(Some(str)) = res else {
151 warn!("Failed to read from mpv IPC. Assuming mpv shutdown.");
152 shutdown_ref.cancel();
153 if let Some(list) = event_handlers_ref.lock().await.get("shutdown") {
157 for handler in list {
158 handler.send(json!({"event": "shutdown"})).await.unwrap();
159 }
160 }
161 break; };
163
164 trace!("<-mpv: {}", str);
165 let json = serde_json::from_str::<serde_json::Value>(str.as_str()).unwrap();
166 if let Ok(mpv_resp) = MpvResponse::deserialize(&json) {
167 if let Some(tx) = requests_ref.lock().await.remove(&mpv_resp.request_id) {
168 if mpv_resp.error == "success" {
169 tx.send(Ok(mpv_resp.data.unwrap_or(serde_json::Value::Null))).unwrap();
170 } else {
171 tx.send(Err(anyhow!(mpv_resp.error))).unwrap();
172 }
173 } else {
174 warn!("Unhandled requests ID: {}", mpv_resp.request_id);
175 }
176 } else if let Some(event) = json.as_object().and_then(|j| j.get("event")).and_then(|j| j.as_str()) {
177 trace!("Event '{}'", event);
178 if let Some(list) = event_handlers_ref.lock().await.get(event) {
179 for handler in list {
180 handler.send(json.clone()).await.unwrap();
181 }
182 }
183 if event == "property-change" {
184 let id = json.as_object().unwrap().get("id").unwrap().as_u64().unwrap() as usize;
185 if let Some(tx) = observers_ref.lock().await.get(&id) {
186 let data = json.as_object().unwrap().get("data").map(|d| d.to_owned());
187 tx.send(data).await.unwrap();
188 } else {
189 warn!("Unhandled observable ID: {}", id);
190 }
191 }
192 if event == "shutdown" {
193 info!("Received mpv 'shutdown' event.");
194 shutdown_ref.cancel();
195 break; }
198 } else {
199 warn!("Unhandled mpv message: {}", str);
200 }
201 }
202 });
203
204 Ok(Self {
205 shutdown,
206 writer,
207 request_id: 0,
208 requests,
209 observers,
210 event_handlers,
211 tasks: vec![mpv_ipc_task],
212 child: None,
213 })
214 }
215 pub async fn spawn(opt: &MpvSpawnOptions) -> anyhow::Result<Self> {
217 let mpv_path = opt
218 .mpv_path
219 .as_ref()
220 .map(|v| Cow::Borrowed(v))
221 .unwrap_or_else(|| Cow::Owned(mpv_platform::default_mpv_bin()));
222 let ipc_path = opt
223 .ipc_path
224 .as_ref()
225 .map(|v| Cow::Borrowed(v))
226 .unwrap_or_else(|| Cow::Owned(mpv_platform::generate_ipc_path()));
227 let mut args = vec![
228 "--idle".to_owned(),
229 "--input-ipc-server=".to_owned() + &ipc_path.to_string_lossy(),
230 ];
231 if let Some(config_dir) = &opt.config_dir {
232 args.push("--config-dir=".to_owned() + &config_dir.to_string_lossy());
233 }
234 let stdout_mode = || {
235 if opt.inherit_stdout {
236 Stdio::inherit()
237 } else {
238 Stdio::null()
239 }
240 };
241 let child = process::Command::new(mpv_path.as_ref())
242 .args(args)
243 .stdin(Stdio::null())
244 .stdout(stdout_mode())
245 .stderr(stdout_mode())
246 .spawn()
247 .context("Failed to spawn mpv process")?;
248 let child_pid = child.id().unwrap();
249 info!("mpv spawned! pid: {}", child_pid);
250
251 let mut sself = Self::connect(&ipc_path).await?;
253 sself.child = Some(child);
254
255 let ipc_pid = sself.get_prop::<u32>("pid").await?;
257 if ipc_pid != child_pid {
258 warn!("mpv process pid and mpv ipc pid don't match");
259 }
260
261 Ok(sself)
262 }
263 pub async fn running(&self) -> bool {
264 !self.shutdown.is_cancelled()
265 }
266 pub async fn send_command(&mut self, cmd: serde_json::Value) -> anyhow::Result<serde_json::Value> {
269 if self.shutdown.is_cancelled() {
270 bail!("mpv instance has shut down");
271 }
272 let (tx, rx) = oneshot::channel::<anyhow::Result<serde_json::Value>>();
273 self.request_id += 1;
274 self.requests.lock().await.insert(self.request_id, tx);
275 let str = serde_json::to_string(&MpvCommand {
276 request_id: self.request_id,
277 command: cmd,
278 })
279 .unwrap();
280 trace!("->mpv: {}", str);
281 self.writer.write_all((str + "\n").as_bytes()).await?;
282 tokio::select! {
283 result = rx => result?,
284 _ = self.shutdown.cancelled() => bail!("mpv shutdown"),
285 }
286 }
287 fn abort_tasks(&mut self) {
288 for handle in &self.tasks {
289 handle.abort();
290 }
291 self.tasks.clear();
292 }
293 pub async fn quit(&mut self) {
295 self.abort_tasks();
296 let quit_fut = self.writer.write_all(("{\"command\":[\"quit\"]}\n").as_bytes());
297 _ = tokio::time::timeout(Duration::from_secs(2), quit_fut).await;
298 _ = self.writer.shutdown().await;
299 if let Some(child) = &mut self.child {
300 _ = child.kill();
301 }
302 self.shutdown.cancel();
303 }
304 pub async fn disconnect(&mut self) {
306 self.abort_tasks();
307 _ = self.writer.shutdown().await;
308 self.shutdown.cancel();
309 }
310 pub async fn get_prop<T: DeserializeOwned>(&mut self, name: &str) -> anyhow::Result<T> {
311 self.send_command(json!(["get_property", name]))
312 .await
313 .and_then(|json| T::deserialize(json).map_err(|_| anyhow!("failed to deserialize prop")))
314 }
315 pub async fn set_prop(&mut self, name: &str, value: impl Serialize) -> anyhow::Result<()> {
316 self.send_command(json!(["set_property", name, value]))
317 .await
318 .map(|_| ())
319 }
320 pub async fn watch_event<A, F, Fut>(
321 &mut self,
322 name: impl AsRef<str> + 'static + Send + Sync + Serialize + Display,
323 callback: F,
324 ) where
325 for<'a> Fut: Future<Output = A> + Send + 'a,
326 for<'a> F: (Fn(serde_json::Value) -> Fut) + Send + 'a,
327 {
328 let (json_tx, mut json_rx) = mpsc::channel::<serde_json::Value>(1);
329 let enable = {
330 let mut event_handlers = self.borrow_mut().event_handlers.lock().await;
331 if let Some(list) = event_handlers.get_mut(name.as_ref()) {
332 list.push(json_tx);
333 false
334 } else {
335 _ = event_handlers.insert(name.to_string(), vec![json_tx]);
336 true
337 }
338 };
339 if enable {
340 self.send_command(json!(["enable_event", name])).await.unwrap();
341 }
342 self.tasks.push(tokio::spawn(async move {
343 loop {
344 let json = json_rx.recv().await.unwrap();
345 trace!("Got watched event value '{}': {:?}", name, json);
346 callback(json).await;
347 }
348 }));
349 }
350 pub async fn observe_prop<T: 'static + Send + Sync + Clone + DeserializeOwned>(
351 &mut self,
352 name: impl AsRef<str> + 'static + Send + Sync + Serialize + Display,
353 default: T,
354 ) -> watch::Receiver<T> {
355 self.request_id += 1;
357 let id = self.request_id;
358 let (json_tx, mut json_rx) = mpsc::channel::<MpvDataOption>(10);
359 self.observers.lock().await.insert(id, json_tx);
360 self.send_command(json!(["observe_property", id, name])).await.unwrap();
361
362 let init_val = self.get_prop(name.as_ref()).await.unwrap_or_else(|_| default.clone());
364 let (t_tx, t_rx) = watch::channel::<T>(init_val);
365 self.tasks.push(tokio::spawn(async move {
366 loop {
367 if let Some(json) = json_rx.recv().await.unwrap() {
368 trace!("Got observed value '{}': {}", name, json);
369 if let Ok(val) = T::deserialize(&json) {
370 _ = t_tx.send(val);
371 } else {
372 warn!("Failed to deserialize observable '{}'. Using default.", name);
373 _ = t_tx.send(default.clone());
374 }
375 } else {
376 debug!("Observable '{}' updated without a value. Using default.", name);
377 _ = t_tx.send(default.clone());
378 }
379 }
380 }));
381 t_rx
382 }
383}
384impl Drop for MpvIpc {
385 fn drop(&mut self) {
386 tokio::task::block_in_place(move || {
387 tokio::runtime::Handle::current().block_on(async {
388 if self.child.is_some() {
389 self.quit().await;
390 } else {
391 self.disconnect().await;
392 }
393 });
394 });
395 }
396}