pub mod builder;
pub mod context;
pub mod dispatcher;
pub mod events;
pub mod listener;
pub mod models;
use std::sync::Arc;
use std::sync::atomic::{
AtomicU64,
Ordering,
};
use rustybook_http::client::Client as HttpClient;
use tokio::sync::{
Mutex,
RwLock,
broadcast,
};
use tokio::task::JoinHandle;
use tokio::time::{
Duration,
timeout,
};
use tracing::{
debug,
trace,
warn,
};
#[cfg(feature = "cache")]
use crate::cache::Cache;
use crate::client::context::Context;
use crate::client::events::{
EventHandler,
dispatch_event,
};
use crate::client::models::User;
use crate::error::MessengerError;
use crate::gateway::events::Event;
use crate::gateway::lightspeed::codec::encode_request;
use crate::gateway::mqtt::Runtime;
use crate::http::send::{
PendingRequests,
build_send_text_payload,
extract_send_receipt,
};
use crate::model::SendReceipt;
use crate::state::State;
pub use builder::MessengerClientBuilder;
#[derive(Debug, Clone)]
struct MessengerConfig {
cookies_file_path: Option<String>,
shared_user_id: Option<String>,
shared_cookie_header: Option<String>,
shared_http: Option<HttpClient>,
user_agent: Option<String>,
proxy: Option<String>,
online: bool,
}
struct Inner {
config: MessengerConfig,
state: RwLock<Option<State>>,
mqtt: Mutex<Option<Runtime>>,
listener: Mutex<Option<JoinHandle<()>>>,
handler: Mutex<Option<JoinHandle<()>>>,
events: broadcast::Sender<Event>,
pending: PendingRequests,
request_id: AtomicU64,
task_id: AtomicU64,
#[cfg(feature = "cache")]
cache: Arc<Cache>,
}
#[derive(Clone)]
pub struct MessengerClient {
inner: Arc<Inner>,
}
impl MessengerClient {
pub fn builder() -> MessengerClientBuilder {
MessengerClientBuilder::new()
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.inner.events.subscribe()
}
pub async fn start_with_handler(
&self,
handler: Arc<dyn EventHandler>,
) -> Result<(), MessengerError> {
{
let mut handler_guard = self.inner.handler.lock().await;
if let Some(task) = handler_guard.as_ref() {
if !task.is_finished() {
return Ok(());
}
handler_guard.take();
}
}
let mut receiver = self.subscribe();
self.start().await?;
let ready_user = User {
id: self.uid().await.ok_or(MessengerError::NotStarted)?,
name: self.name().await,
};
let context = Context::new(self.clone());
let task = tokio::spawn(async move {
handler.ready(context.clone(), ready_user.clone()).await;
loop {
match receiver.recv().await {
Ok(Event::Listening) => {}
Ok(event) => {
dispatch_event(handler.as_ref(), context.clone(), &ready_user, event).await;
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!(skipped, "handler lagged behind event stream");
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
let mut handler_guard = self.inner.handler.lock().await;
*handler_guard = Some(task);
Ok(())
}
pub async fn send_text(
&self,
thread_id: &str,
text: &str,
) -> Result<SendReceipt, MessengerError> {
if thread_id.is_empty() {
return Err(MessengerError::Config("thread_id is required".to_string()));
}
let request_id = self.inner.request_id.fetch_add(1, Ordering::Relaxed);
let task_id = self.inner.task_id.fetch_add(1, Ordering::Relaxed);
let receiver = self.inner.pending.register(request_id).await;
let state = {
let state_guard = self.inner.state.read().await;
state_guard.clone().ok_or(MessengerError::NotStarted)?
};
let payload = build_send_text_payload(
request_id,
task_id,
thread_id,
text,
&state.ls_app_id,
&state.ls_version_id,
);
let payload = encode_request(&payload)?;
{
let mqtt_guard = self.inner.mqtt.lock().await;
let Some(mqtt) = mqtt_guard.as_ref() else {
return Err(MessengerError::NotStarted);
};
mqtt.publish("/ls_req", payload).await?;
}
let response = timeout(Duration::from_secs(10), receiver)
.await
.map_err(|_| MessengerError::Timeout)?
.map_err(|_| MessengerError::Protocol("request receiver closed".to_string()))?;
debug!(
request_id = response.request_id,
"received response for outbound message"
);
Ok(extract_send_receipt(thread_id, response.payload))
}
pub async fn uid(&self) -> Option<String> {
let state_guard = self.inner.state.read().await;
state_guard.as_ref().map(|state| state.user_id.clone())
}
pub async fn name(&self) -> Option<String> {
let state_guard = self.inner.state.read().await;
state_guard
.as_ref()
.and_then(|state| state.user_name.clone())
}
pub async fn ensure_state(&self) -> Result<(), MessengerError> {
{
let state_guard = self.inner.state.read().await;
if state_guard.is_some() {
return Ok(());
}
}
let state =
if let Some(path) = self.inner.config.cookies_file_path.as_deref() {
State::from_cookies_file(
path,
self.inner.config.user_agent.as_deref(),
self.inner.config.proxy.as_deref(),
)
.await?
} else {
State::from_shared(
self.inner.config.shared_user_id.clone().ok_or_else(|| {
MessengerError::Config("missing shared user_id".to_string())
})?,
self.inner
.config
.shared_cookie_header
.clone()
.ok_or_else(|| {
MessengerError::Config("missing shared cookie_header".to_string())
})?,
self.inner.config.shared_http.clone().ok_or_else(|| {
MessengerError::Config("missing shared http client".to_string())
})?,
self.inner.config.user_agent.as_deref(),
)
.await?
};
trace!(
user_id = state.user_id,
has_name = state.user_name.is_some(),
has_fb_dtsg = state.fb_dtsg.is_some(),
has_lsd = state.lsd.is_some(),
has_jazoest = state.jazoest.is_some(),
has_client_revision = state.client_revision.is_some(),
"state initialized from cookies"
);
let mut state_guard = self.inner.state.write().await;
*state_guard = Some(state);
Ok(())
}
}