filthy-rich 0.8.3

Tiny, ergonomic Discord Rich Presence library for your Rust apps.
Documentation
use std::{
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    time::Duration,
};
use tokio::{sync::mpsc, task::JoinHandle, time::sleep};

use anyhow::{Result, anyhow, bail};

use crate::{
    PresenceClient,
    socket::DiscordSock,
    types::{Activity, IPCCommand, ReadyData, RpcFrame},
    utils::get_current_timestamp,
};

const MULTIPLE_RUN_CALL_ERR: &str = "PresenceRunner::run() called more than once";

/// A runner that manages the Discord RPC background task.
/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
pub struct PresenceRunner {
    rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
    client: PresenceClient,
    join_handle: Option<JoinHandle<Result<()>>>,
    on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
}

impl PresenceRunner {
    pub fn new(client_id: &str) -> Self {
        let (tx, rx) = mpsc::channel(32);
        let client = PresenceClient {
            tx,
            client_id: client_id.to_string(),
            running: Arc::new(AtomicBool::new(false)),
        };

        Self {
            rx: Some(rx),
            client,
            join_handle: None,
            on_ready: None,
        }
    }

    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
        self.on_ready = Some(Box::new(f));
        self
    }

    /// Run the runner.
    /// Must be called before any client handle operations.
    pub async fn run(&mut self, wait_for_ready: bool) -> Result<&PresenceClient> {
        if self.client.running.swap(true, Ordering::SeqCst) {
            bail!(MULTIPLE_RUN_CALL_ERR)
        }

        let client_id = self.client.client_id.clone();
        let running = self.client.running.clone();
        let mut rx = self
            .rx
            .take()
            .ok_or_else(|| anyhow!(MULTIPLE_RUN_CALL_ERR))?;

        // oneshot channel to signal when READY is received the first time
        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();

        let on_ready = self.on_ready.take();

        let join_handle = tokio::spawn(async move {
            let mut backoff = 1;
            let mut last_activity: Option<Activity> = None;
            let mut ready_tx = Some(ready_tx);

            'outer: while running.load(Ordering::SeqCst) {
                // initial connect
                let mut socket = match DiscordSock::new().await {
                    Ok(s) => s,
                    Err(_) => {
                        sleep(Duration::from_secs(backoff)).await;
                        continue;
                    }
                };

                // initial handshake
                if socket.do_handshake(&client_id).await.is_err() {
                    sleep(Duration::from_secs(backoff)).await;
                    continue;
                }

                loop {
                    let frame = match socket.read_frame().await {
                        Ok(f) => f,
                        Err(_) => {
                            break;
                        }
                    };

                    if frame.opcode != 1 {
                        continue;
                    }

                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
                        if json.cmd.as_deref() == Some("DISPATCH")
                            && json.evt.as_deref() == Some("READY")
                        {
                            if let Some(tx) = ready_tx.take() {
                                let _ = tx.send(());
                            }
                            if let Some(f) = &on_ready {
                                if let Some(data) = json.data {
                                    f(data);
                                }
                            }
                            break;
                        }

                        if json.evt.as_deref() == Some("ERROR") {
                            eprintln!("Discord RPC error: {:?}", json.data);
                        }
                    }
                }

                let session_start = get_current_timestamp()?;

                if let Some(activity) = &last_activity {
                    let _ = socket.send_activity(activity.clone(), session_start).await;
                }

                backoff = 1;

                loop {
                    tokio::select! {
                        biased;

                        cmd = rx.recv() => {
                            match cmd {
                                Some(cmd) => {
                                    match cmd {
                                        IPCCommand::SetActivity { activity } => {
                                            last_activity = Some(activity.clone());

                                            if socket.send_activity(activity, session_start).await.is_err() {
                                                break;
                                            }
                                        },
                                        IPCCommand::ClearActivity => {
                                            last_activity = None;

                                            if socket.clear_activity().await.is_err() { break; }
                                        },
                                        IPCCommand::Close => {
                                            let _ = socket.close().await;
                                            running.store(false, Ordering::SeqCst);
                                            break 'outer;
                                        }
                                    }
                                },
                                None => break,
                            }
                        }

                        frame = socket.read_frame() => {
                            match frame {
                                Ok(frame) => match frame.opcode {
                                    1 => {
                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
                                            if json.evt.as_deref() == Some("ERROR") {
                                                eprintln!("Discord RPC error: {:?}", json.data);
                                            }
                                        }
                                    }
                                    2 => break,
                                    3 => {
                                        if socket.send_frame(3, frame.body).await.is_err() { break; }
                                    }
                                    _ => {}
                                },
                                Err(_) => break,
                            }
                        }
                    }
                }

                sleep(Duration::from_secs(backoff)).await;
                backoff = (backoff * 2).min(4);
            }

            Ok(())
        });

        self.join_handle = Some(join_handle);

        if wait_for_ready {
            match ready_rx.await {
                Ok(()) => (),
                Err(_) => bail!("Background task exited before READY."),
            }
        }

        Ok(&self.client)
    }

    /// Returns a clone of the client handle for sharing.
    pub fn clone_handle(&self) -> PresenceClient {
        self.client.clone()
    }

    /// Waits for the IPC task to finish.
    pub async fn wait(&mut self) -> Result<()> {
        if let Some(handle) = self.join_handle.take() {
            handle.await??;
        }

        Ok(())
    }
}