use std::collections::HashSet;
use serde_json::Value;
use tracing::trace;
use super::legacy::{
parse_message_delta,
value_as_i64,
value_as_string,
};
use crate::error::MessengerError;
use crate::gateway::events::{
Event,
MessageEvent,
TypingEvent,
};
use crate::gateway::lightspeed::codec::decode_envelope;
use crate::gateway::lightspeed::op::RespOp;
use crate::gateway::lightspeed::payload::Payload;
pub(super) fn parse_lightspeed(payload: &[u8]) -> Result<Vec<Event>, MessengerError> {
trace!(payload_len = payload.len(), "parsing lightspeed payload");
let envelope = decode_envelope("/ls_resp", payload)?;
let mut output = Vec::new();
trace!(
request_id_type = envelope.request_id.name(),
request_id_int = envelope.request_id.as_u64(),
target = envelope.target,
sp_count = envelope.sp.len(),
"parsed lightspeed envelope"
);
if let Payload::Response(response) = &envelope.payload {
trace!(
name = response.name.as_deref().unwrap_or(""),
operation_count = response.operations.len(),
"parsed lightspeed response payload"
);
for operation in &response.operations {
match operation {
RespOp::UpdateTypingIndicator {
thread_id,
user_id,
is_typing,
} => output.push(Event::Typing(TypingEvent {
user_id: user_id.clone(),
thread_id: Some(thread_id.clone()),
is_typing: *is_typing,
})),
RespOp::InsertMessage {
thread_id,
sender_id,
message_id,
text,
timestamp_ms,
} => output.push(Event::Message(MessageEvent {
message_id: Some(message_id.clone()),
thread_id: thread_id.clone(),
sender_id: sender_id.clone(),
text: text.clone(),
timestamp_ms: *timestamp_ms,
})),
_ => {}
}
}
}
let mut messages = Vec::new();
collect_lightspeed_messages(&envelope.raw, 0, &mut messages);
trace!(
candidate_messages = messages.len(),
"collected lightspeed message candidates"
);
let mut dedup = HashSet::new();
for message in messages {
let key = format!(
"{}|{}|{}|{}|{}",
message.thread_id,
message.sender_id,
message.message_id.as_deref().unwrap_or(""),
message.timestamp_ms.unwrap_or_default(),
message.text.as_deref().unwrap_or("")
);
if dedup.insert(key) {
output.push(Event::Message(message));
}
}
trace!(events = output.len(), "parsed lightspeed events");
Ok(output)
}
fn collect_lightspeed_messages(value: &Value, depth: usize, messages: &mut Vec<MessageEvent>) {
if depth > 10 {
return;
}
if let Some(message) = parse_message_delta(value) {
messages.push(message);
} else if let Some(message) = parse_lightspeed_message(value) {
messages.push(message);
}
match value {
Value::Object(map) => {
for child in map.values() {
collect_lightspeed_messages(child, depth + 1, messages);
}
}
Value::Array(list) => {
for child in list {
collect_lightspeed_messages(child, depth + 1, messages);
}
}
Value::String(raw) => {
if let Some(parsed) = parse_string_json(raw) {
collect_lightspeed_messages(&parsed, depth + 1, messages);
}
}
_ => {}
}
}
fn parse_lightspeed_message(value: &Value) -> Option<MessageEvent> {
let thread_id = value
.get("thread_id")
.or_else(|| value.get("thread"))
.or_else(|| value.get("thread_fbid"))
.and_then(value_as_string)?;
let sender_id = value
.get("sender_id")
.or_else(|| value.get("actor_id"))
.or_else(|| value.get("author"))
.or_else(|| value.get("from"))
.or_else(|| value.get("sender_fbid"))
.and_then(value_as_string)
.unwrap_or_default();
let message_id = value
.get("message_id")
.or_else(|| value.get("messageId"))
.or_else(|| value.get("mid"))
.or_else(|| value.get("offline_threading_id"))
.and_then(value_as_string);
let text = value
.get("text")
.or_else(|| value.get("body"))
.or_else(|| value.get("snippet"))
.and_then(value_as_string);
let timestamp_ms = value
.get("timestamp_ms")
.or_else(|| value.get("timestamp"))
.or_else(|| value.get("message_timestamp"))
.and_then(value_as_i64);
if sender_id.is_empty() && message_id.is_none() && text.is_none() {
return None;
}
Some(MessageEvent {
message_id,
thread_id,
sender_id,
text,
timestamp_ms,
})
}
fn parse_string_json(raw: &str) -> Option<Value> {
let trimmed = raw.trim();
if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
return None;
}
serde_json::from_str::<Value>(trimmed).ok()
}