rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
pub mod builder;
pub mod context;
pub mod dispatcher;
pub mod events;
pub mod listener;
pub mod models;

use std::sync::Arc;
use std::sync::atomic::{
    AtomicU64,
    Ordering,
};

use rustybook_http::client::Client as HttpClient;
use tokio::sync::{
    Mutex,
    RwLock,
    broadcast,
};
use tokio::task::JoinHandle;
use tokio::time::{
    Duration,
    timeout,
};
use tracing::{
    debug,
    trace,
    warn,
};

#[cfg(feature = "cache")]
use crate::cache::Cache;
use crate::client::context::Context;
use crate::client::events::{
    EventHandler,
    dispatch_event,
};
use crate::client::models::User;
use crate::error::MessengerError;
use crate::gateway::events::Event;
use crate::gateway::lightspeed::codec::encode_request;
use crate::gateway::mqtt::Runtime;
use crate::http::send::{
    PendingRequests,
    build_send_text_payload,
    extract_send_receipt,
};
use crate::model::SendReceipt;
use crate::state::State;

pub use builder::MessengerClientBuilder;

#[derive(Debug, Clone)]
struct MessengerConfig {
    cookies_file_path: Option<String>,
    shared_user_id: Option<String>,
    shared_cookie_header: Option<String>,
    shared_http: Option<HttpClient>,
    user_agent: Option<String>,
    proxy: Option<String>,
    online: bool,
}

struct Inner {
    config: MessengerConfig,
    state: RwLock<Option<State>>,
    mqtt: Mutex<Option<Runtime>>,
    listener: Mutex<Option<JoinHandle<()>>>,
    handler: Mutex<Option<JoinHandle<()>>>,
    events: broadcast::Sender<Event>,
    pending: PendingRequests,
    request_id: AtomicU64,
    task_id: AtomicU64,
    #[cfg(feature = "cache")]
    cache: Arc<Cache>,
}

#[derive(Clone)]
pub struct MessengerClient {
    inner: Arc<Inner>,
}

impl MessengerClient {
    /// Returns a builder for `MessengerClient`.
    pub fn builder() -> MessengerClientBuilder {
        MessengerClientBuilder::new()
    }

    /// Subscribes to messenger event stream.
    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
        self.inner.events.subscribe()
    }

    /// Starts messenger listener and dispatches events to `handler`.
    pub async fn start_with_handler(
        &self,
        handler: Arc<dyn EventHandler>,
    ) -> Result<(), MessengerError> {
        {
            let mut handler_guard = self.inner.handler.lock().await;
            if let Some(task) = handler_guard.as_ref() {
                if !task.is_finished() {
                    return Ok(());
                }

                handler_guard.take();
            }
        }

        let mut receiver = self.subscribe();
        self.start().await?;

        let ready_user = User {
            id: self.uid().await.ok_or(MessengerError::NotStarted)?,
            name: self.name().await,
        };
        let context = Context::new(self.clone());

        let task = tokio::spawn(async move {
            handler.ready(context.clone(), ready_user.clone()).await;

            loop {
                match receiver.recv().await {
                    Ok(Event::Listening) => {}
                    Ok(event) => {
                        dispatch_event(handler.as_ref(), context.clone(), &ready_user, event).await;
                    }
                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
                        warn!(skipped, "handler lagged behind event stream");
                    }
                    Err(broadcast::error::RecvError::Closed) => break,
                }
            }
        });

        let mut handler_guard = self.inner.handler.lock().await;
        *handler_guard = Some(task);

        Ok(())
    }

    /// Sends a text message to a thread.
    pub async fn send_text(
        &self,
        thread_id: &str,
        text: &str,
    ) -> Result<SendReceipt, MessengerError> {
        if thread_id.is_empty() {
            return Err(MessengerError::Config("thread_id is required".to_string()));
        }

        let request_id = self.inner.request_id.fetch_add(1, Ordering::Relaxed);
        let task_id = self.inner.task_id.fetch_add(1, Ordering::Relaxed);
        let receiver = self.inner.pending.register(request_id).await;

        let state = {
            let state_guard = self.inner.state.read().await;
            state_guard.clone().ok_or(MessengerError::NotStarted)?
        };

        let payload = build_send_text_payload(
            request_id,
            task_id,
            thread_id,
            text,
            &state.ls_app_id,
            &state.ls_version_id,
        );
        let payload = encode_request(&payload)?;

        {
            let mqtt_guard = self.inner.mqtt.lock().await;
            let Some(mqtt) = mqtt_guard.as_ref() else {
                return Err(MessengerError::NotStarted);
            };

            mqtt.publish("/ls_req", payload).await?;
        }

        let response = timeout(Duration::from_secs(10), receiver)
            .await
            .map_err(|_| MessengerError::Timeout)?
            .map_err(|_| MessengerError::Protocol("request receiver closed".to_string()))?;

        debug!(
            request_id = response.request_id,
            "received response for outbound message"
        );
        Ok(extract_send_receipt(thread_id, response.payload))
    }

    /// Returns current logged-in user id if available.
    pub async fn uid(&self) -> Option<String> {
        let state_guard = self.inner.state.read().await;
        state_guard.as_ref().map(|state| state.user_id.clone())
    }

    /// Returns current logged-in user name if available.
    pub async fn name(&self) -> Option<String> {
        let state_guard = self.inner.state.read().await;
        state_guard
            .as_ref()
            .and_then(|state| state.user_name.clone())
    }

    pub async fn ensure_state(&self) -> Result<(), MessengerError> {
        {
            let state_guard = self.inner.state.read().await;
            if state_guard.is_some() {
                return Ok(());
            }
        }

        let state =
            if let Some(path) = self.inner.config.cookies_file_path.as_deref() {
                State::from_cookies_file(
                    path,
                    self.inner.config.user_agent.as_deref(),
                    self.inner.config.proxy.as_deref(),
                )
                .await?
            } else {
                State::from_shared(
                    self.inner.config.shared_user_id.clone().ok_or_else(|| {
                        MessengerError::Config("missing shared user_id".to_string())
                    })?,
                    self.inner
                        .config
                        .shared_cookie_header
                        .clone()
                        .ok_or_else(|| {
                            MessengerError::Config("missing shared cookie_header".to_string())
                        })?,
                    self.inner.config.shared_http.clone().ok_or_else(|| {
                        MessengerError::Config("missing shared http client".to_string())
                    })?,
                    self.inner.config.user_agent.as_deref(),
                )
                .await?
            };
        trace!(
            user_id = state.user_id,
            has_name = state.user_name.is_some(),
            has_fb_dtsg = state.fb_dtsg.is_some(),
            has_lsd = state.lsd.is_some(),
            has_jazoest = state.jazoest.is_some(),
            has_client_revision = state.client_revision.is_some(),
            "state initialized from cookies"
        );

        let mut state_guard = self.inner.state.write().await;
        *state_guard = Some(state);

        Ok(())
    }
}