#![doc = include_str!("../README.md")]
mod auth;
mod error;
mod events;
mod graphql;
mod lightspeed;
mod model;
mod mqtt;
mod parser;
mod send;
mod state;
pub use error::MessengerError;
pub use events::{
Event,
EventError,
MessageEvent,
PresenceEvent,
TypingEvent,
};
pub use model::SendReceipt;
use std::sync::Arc;
use std::sync::atomic::{
AtomicU64,
Ordering,
};
use rustybook_http::client::Client as HttpClient;
use tokio::sync::{
Mutex,
RwLock,
broadcast,
};
use tokio::task::JoinHandle;
use tokio::time::{
Duration,
timeout,
};
use tracing::{
debug,
info,
trace,
warn,
};
use crate::events::EventKind;
use crate::lightspeed::encode_request;
use crate::mqtt::MqttRuntime;
use crate::parser::parse_events;
use crate::send::{
PendingRequests,
build_send_text_payload,
extract_send_receipt,
};
use crate::state::State;
#[derive(Debug, Clone)]
struct MessengerConfig {
cookies_file_path: Option<String>,
shared_user_id: Option<String>,
shared_cookie_header: Option<String>,
shared_http: Option<HttpClient>,
user_agent: Option<String>,
proxy: Option<String>,
online: bool,
}
struct Inner {
config: MessengerConfig,
state: RwLock<Option<State>>,
mqtt: Mutex<Option<MqttRuntime>>,
listener: Mutex<Option<JoinHandle<()>>>,
events: broadcast::Sender<Event>,
pending: PendingRequests,
request_id: AtomicU64,
task_id: AtomicU64,
}
#[derive(Clone)]
pub struct MessengerClient {
inner: Arc<Inner>,
}
#[derive(Debug, Clone)]
pub struct MessengerClientBuilder {
cookies_file_path: Option<String>,
shared_user_id: Option<String>,
shared_cookie_header: Option<String>,
shared_http: Option<HttpClient>,
user_agent: Option<String>,
proxy: Option<String>,
online: bool,
}
impl Default for MessengerClientBuilder {
fn default() -> Self {
Self::new()
}
}
impl MessengerClientBuilder {
pub fn new() -> Self {
Self {
cookies_file_path: None,
shared_user_id: None,
shared_cookie_header: None,
shared_http: None,
user_agent: None,
proxy: None,
online: true,
}
}
pub fn cookies_file_path(mut self, path: impl Into<String>) -> Self {
self.cookies_file_path = Some(path.into());
self
}
pub fn shared_session(
mut self,
user_id: impl Into<String>,
cookie_header: impl Into<String>,
http: HttpClient,
) -> Self {
self.shared_user_id = Some(user_id.into());
self.shared_cookie_header = Some(cookie_header.into());
self.shared_http = Some(http);
self
}
pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = Some(user_agent.into());
self
}
pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
self.proxy = Some(proxy.into());
self
}
pub fn online(mut self, online: bool) -> Self {
self.online = online;
self
}
pub fn build(self) -> Result<MessengerClient, MessengerError> {
let has_cookie_path = self.cookies_file_path.is_some();
let has_shared = self.shared_user_id.is_some()
|| self.shared_cookie_header.is_some()
|| self.shared_http.is_some();
if !has_cookie_path && !has_shared {
return Err(MessengerError::Config(
"cookies_file_path or shared_session must be provided".to_string(),
));
}
if has_cookie_path && has_shared {
return Err(MessengerError::Config(
"cookies_file_path cannot be combined with shared_session".to_string(),
));
}
if has_shared
&& (self.shared_user_id.is_none()
|| self.shared_cookie_header.is_none()
|| self.shared_http.is_none())
{
return Err(MessengerError::Config(
"shared_session requires user_id, cookie_header, and http client".to_string(),
));
}
let (events, _) = broadcast::channel(1024);
let inner = Inner {
config: MessengerConfig {
cookies_file_path: self.cookies_file_path,
shared_user_id: self.shared_user_id,
shared_cookie_header: self.shared_cookie_header,
shared_http: self.shared_http,
user_agent: self.user_agent,
proxy: self.proxy,
online: self.online,
},
state: RwLock::new(None),
mqtt: Mutex::new(None),
listener: Mutex::new(None),
events,
pending: PendingRequests::new(),
request_id: AtomicU64::new(rand::random::<u64>() & 0x7fff_ffff_ffff_ffff),
task_id: AtomicU64::new(1),
};
Ok(MessengerClient {
inner: Arc::new(inner),
})
}
}
impl MessengerClient {
pub fn builder() -> MessengerClientBuilder {
MessengerClientBuilder::new()
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.inner.events.subscribe()
}
pub async fn start(&self) -> Result<(), MessengerError> {
self.ensure_state().await
}
pub async fn start_listening(&self) -> Result<(), MessengerError> {
info!("starting messenger listener");
self.ensure_state().await?;
{
let listener_guard = self.inner.listener.lock().await;
if listener_guard.is_some() {
return Ok(());
}
}
let state = {
let state_guard = self.inner.state.read().await;
state_guard.clone().ok_or(MessengerError::NotStarted)?
};
debug!(
user_id = state.user_id,
region = state.region,
has_sequence = state.sequence_id.is_some(),
"state ready for mqtt connection"
);
let (mut runtime, eventloop) =
MqttRuntime::connect(&state, self.inner.config.online).await?;
runtime
.initialize_session(&state, self.inner.config.online)
.await?;
runtime.start_polling(eventloop)?;
let Some(mut receiver) = runtime.take_receiver() else {
return Err(MessengerError::Protocol(
"failed to initialize mqtt receiver".to_string(),
));
};
{
let mut mqtt_guard = self.inner.mqtt.lock().await;
*mqtt_guard = Some(runtime);
}
let pending = self.inner.pending.clone();
let events = self.inner.events.clone();
let task = tokio::spawn(async move {
info!("event dispatcher task started");
let _ = events.send(Event::Listening);
while let Some(message) = receiver.recv().await {
trace!(
topic = message.topic.as_str(),
payload_len = message.payload.len(),
"dispatcher received mqtt message"
);
let raw_payload = String::from_utf8_lossy(&message.payload);
trace!(
topic = message.topic.as_str(),
payload = %raw_payload,
"received raw mqtt payload"
);
if message.topic == "/ls_resp"
&& let Err(error) = pending.resolve_payload(&message.payload).await
{
warn!(?error, "failed to resolve request response payload");
}
match parse_events(&message.topic, &message.payload) {
Ok(parsed) => {
trace!(
count = parsed.len(),
topic = message.topic.as_str(),
"parsed events"
);
if parsed.is_empty() && message.topic == "/t_ms" {
let preview = String::from_utf8_lossy(&message.payload);
let preview = preview.chars().take(1024).collect::<String>();
trace!(
payload = preview,
"received /t_ms payload with no parsed events"
);
}
if parsed.is_empty() && message.topic == "/ls_resp" {
let preview = String::from_utf8_lossy(&message.payload);
let preview = preview.chars().take(1024).collect::<String>();
trace!(
payload = preview,
"received /ls_resp payload with no parsed events"
);
}
for event in parsed {
let _ = events.send(event);
}
}
Err(error) => {
let _ = events.send(Event::Error(EventError {
kind: EventKind::Parse,
message: error.to_string(),
}));
}
}
}
let _ = events.send(Event::Disconnect);
});
let mut listener_guard = self.inner.listener.lock().await;
*listener_guard = Some(task);
Ok(())
}
pub async fn stop_listening(&self) -> Result<(), MessengerError> {
info!("stopping messenger listener");
let listener = {
let mut listener_guard = self.inner.listener.lock().await;
listener_guard.take()
};
if let Some(task) = listener {
task.abort();
}
let runtime = {
let mut mqtt_guard = self.inner.mqtt.lock().await;
mqtt_guard.take()
};
if let Some(runtime) = runtime {
runtime.stop().await?;
}
Ok(())
}
pub async fn listen(&self) -> Result<(), MessengerError> {
self.start_listening().await?;
loop {
{
let listener_guard = self.inner.listener.lock().await;
let Some(task) = listener_guard.as_ref() else {
return Ok(());
};
if task.is_finished() {
return Ok(());
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
pub async fn close(&self) -> Result<(), MessengerError> {
self.stop_listening().await
}
pub async fn send_text(
&self,
thread_id: &str,
text: &str,
) -> Result<SendReceipt, MessengerError> {
if thread_id.is_empty() {
return Err(MessengerError::Config("thread_id is required".to_string()));
}
let request_id = self.inner.request_id.fetch_add(1, Ordering::Relaxed);
let task_id = self.inner.task_id.fetch_add(1, Ordering::Relaxed);
let receiver = self.inner.pending.register(request_id).await;
let state = {
let state_guard = self.inner.state.read().await;
state_guard.clone().ok_or(MessengerError::NotStarted)?
};
let payload = build_send_text_payload(
request_id,
task_id,
thread_id,
text,
&state.ls_app_id,
&state.ls_version_id,
);
let payload = encode_request(&payload)?;
{
let mqtt_guard = self.inner.mqtt.lock().await;
let Some(mqtt) = mqtt_guard.as_ref() else {
return Err(MessengerError::NotStarted);
};
mqtt.publish("/ls_req", payload).await?;
}
let response = timeout(Duration::from_secs(10), receiver)
.await
.map_err(|_| MessengerError::Timeout)?
.map_err(|_| MessengerError::Protocol("request receiver closed".to_string()))?;
debug!(
request_id = response.request_id,
"received response for outbound message"
);
Ok(extract_send_receipt(thread_id, response.payload))
}
pub async fn uid(&self) -> Option<String> {
let state_guard = self.inner.state.read().await;
state_guard.as_ref().map(|state| state.user_id.clone())
}
pub async fn name(&self) -> Option<String> {
let state_guard = self.inner.state.read().await;
state_guard
.as_ref()
.and_then(|state| state.user_name.clone())
}
async fn ensure_state(&self) -> Result<(), MessengerError> {
{
let state_guard = self.inner.state.read().await;
if state_guard.is_some() {
return Ok(());
}
}
let state =
if let Some(path) = self.inner.config.cookies_file_path.as_deref() {
State::from_cookies_file(
path,
self.inner.config.user_agent.as_deref(),
self.inner.config.proxy.as_deref(),
)
.await?
} else {
State::from_shared(
self.inner.config.shared_user_id.clone().ok_or_else(|| {
MessengerError::Config("missing shared user_id".to_string())
})?,
self.inner
.config
.shared_cookie_header
.clone()
.ok_or_else(|| {
MessengerError::Config("missing shared cookie_header".to_string())
})?,
self.inner.config.shared_http.clone().ok_or_else(|| {
MessengerError::Config("missing shared http client".to_string())
})?,
self.inner.config.user_agent.as_deref(),
)
.await?
};
info!(
user_id = state.user_id,
has_name = state.user_name.is_some(),
has_fb_dtsg = state.fb_dtsg.is_some(),
has_lsd = state.lsd.is_some(),
has_jazoest = state.jazoest.is_some(),
has_client_revision = state.client_revision.is_some(),
"state initialized from cookies"
);
let mut state_guard = self.inner.state.write().await;
*state_guard = Some(state);
Ok(())
}
}