Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::time::Duration;
4
5use anyhow::{Result, bail};
6use tokio::{
7    runtime::{Builder, Runtime},
8    sync::mpsc,
9    task::JoinHandle,
10    time::sleep,
11};
12use uuid::Uuid;
13
14use crate::{
15    socket::DiscordIPCSocket,
16    utils::{get_current_timestamp, pack},
17};
18
19/// Commands sent to the IPC task.
20#[derive(Debug)]
21enum IPCCommand {
22    SetActivity { details: String, state: String },
23}
24
25/// Async Discord IPC client.
26#[derive(Debug, Clone)]
27pub struct DiscordIPC {
28    tx: mpsc::Sender<IPCCommand>,
29    client_id: String,
30    timestamp: u64,
31}
32
33impl DiscordIPC {
34    /// Create a new IPC instance (does NOT start connection).
35    /// To start a connection and run the client, use `.run()`.
36    pub async fn new(client_id: &str) -> Result<Self> {
37        let (tx, _rx) = mpsc::channel(32);
38
39        Ok(Self {
40            tx,
41            client_id: client_id.to_string(),
42            timestamp: get_current_timestamp()?,
43        })
44    }
45
46    /// Connect, handshake, wait for READY and  start the IPC client.
47    pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
48        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
49        self.tx = tx;
50
51        let client_id = self.client_id.clone();
52        let timestamp = self.timestamp;
53
54        let handle = tokio::spawn(async move {
55            let mut backoff = 1;
56            let mut last_activity: Option<(String, String)> = None;
57
58            loop {
59                // initial connect
60                let mut socket = match DiscordIPCSocket::new().await {
61                    Ok(s) => s,
62                    Err(_) => {
63                        sleep(Duration::from_secs(backoff)).await;
64                        continue;
65                    }
66                };
67
68                // handshake
69                let handshake = format!(r#"{{"v":1,"client_id":"{}"}}"#, client_id);
70
71                let packed = pack(0, handshake.len() as u32)?;
72                if socket.write(&packed).await.is_err()
73                    || socket.write(handshake.as_bytes()).await.is_err()
74                {
75                    sleep(Duration::from_secs(backoff)).await;
76                    continue;
77                }
78
79                // wait for ready
80                loop {
81                    let frame = match socket.read_frame().await {
82                        Ok(f) => f,
83                        Err(_) => break,
84                    };
85
86                    if frame.opcode == 1 && frame.body.windows(5).any(|w| w == b"READY") {
87                        break;
88                    }
89                }
90
91                // replay activity after socket
92                if let Some((details, state)) = &last_activity {
93                    let _ = send_activity(&mut socket, details, state, timestamp).await;
94                }
95
96                backoff = 1;
97
98                // main loop
99                loop {
100                    tokio::select! {
101                        Some(cmd) = rx.recv() => {
102                            match cmd {
103                                IPCCommand::SetActivity { details, state } => {
104                                    last_activity = Some((details.clone(), state.clone()));
105
106                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
107                                        break;
108                                    }
109                                }
110                            }
111                        }
112
113                        frame = socket.read_frame() => {
114                            match frame {
115                                Ok(frame) => {
116                                    match frame.opcode {
117                                        // PING
118                                        3 => {
119                                            let packed = pack(frame.opcode, frame.body.len() as u32)?;
120                                            socket.write(&packed).await?;
121                                            socket.write(&frame.body).await?;
122                                        }
123                                        // CLOSE
124                                        2 => break,
125                                        _ => {}
126                                    }
127                                }
128                                Err(_) => break,
129                            }
130                        }
131                    }
132                }
133
134                sleep(Duration::from_secs(backoff)).await;
135                backoff = (backoff * 2).min(4);
136            }
137        });
138
139        Ok(handle)
140    }
141
142    /// Sets the Discord Rich presence activity.
143    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
144        self.tx
145            .send(IPCCommand::SetActivity {
146                details: details.to_string(),
147                state: state.to_string(),
148            })
149            .await?;
150
151        Ok(())
152    }
153}
154
155/// Helper to send activity JSON.
156async fn send_activity(
157    socket: &mut DiscordIPCSocket,
158    details: &str,
159    state: &str,
160    timestamp: u64,
161) -> Result<()> {
162    let pid = std::process::id();
163    let uuid = Uuid::new_v4();
164
165    let json = format!(
166        r#"{{
167    "cmd":"SET_ACTIVITY",
168    "args": {{
169        "pid": {},
170        "activity": {{
171            "details":"{}",
172            "state":"{}",
173            "timestamps": {{
174                "start": {}
175            }}
176        }}
177    }},
178    "nonce":"{}"
179}}"#,
180        pid, details, state, timestamp, uuid
181    );
182
183    let packed = pack(1, json.len() as u32)?;
184    socket.write(&packed).await?;
185    socket.write(json.as_bytes()).await?;
186
187    Ok(())
188}
189
190/// Blocking wrapper around DiscordIPC.
191#[derive(Debug)]
192pub struct DiscordIPCSync {
193    inner: DiscordIPC,
194    rt: Runtime,
195    ipc_task: Option<JoinHandle<Result<()>>>,
196}
197
198impl DiscordIPCSync {
199    pub fn new(client_id: &str) -> Result<Self> {
200        let rt = Builder::new_multi_thread().enable_all().build()?;
201        let inner = rt.block_on(DiscordIPC::new(client_id))?;
202
203        Ok(Self {
204            inner,
205            rt,
206            ipc_task: None,
207        })
208    }
209
210    pub fn run(&mut self) -> Result<()> {
211        if self.ipc_task.is_some() {
212            bail!("run() called multiple times");
213        }
214
215        let handle = self.rt.block_on(self.inner.run())?;
216        self.ipc_task = Some(handle);
217        Ok(())
218    }
219
220    pub fn wait(&mut self) -> Result<()> {
221        if let Some(handle) = self.ipc_task.take() {
222            self.rt.block_on(handle)??;
223        }
224        Ok(())
225    }
226
227    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
228        self.rt.block_on(self.inner.set_activity(details, state))
229    }
230}